fix journal mode, tests pass

This commit is contained in:
Evgeny Poberezkin
2024-12-16 22:25:21 +00:00
parent c8cc2f262b
commit 7a25174866
8 changed files with 116 additions and 72 deletions
+1
View File
@@ -279,6 +279,7 @@ library
build-depends:
case-insensitive ==1.2.*
, hashable ==1.4.*
, unix ==2.8.*
, websockets ==0.12.*
if impl(ghc >= 9.6.2)
build-depends:
@@ -309,25 +309,44 @@ instance JournalStoreType s => MsgStoreClass (JournalMsgStore s) where
(!count, !res) <- foldQueues 0 processQueue (0, mempty) ("", storePath)
putStrLn $ progress count
pure res
JournalStoreConfig {storePath, pathParts} = config
JournalStoreConfig {queueStoreType, storePath, pathParts} = config
processQueue :: (Int, a) -> (String, FilePath) -> IO (Int, a)
processQueue (!i, !r) (queueId, dir) = do
when (tty && i `mod` 100 == 0) $ putStr (progress i <> "\r") >> IO.hFlush stdout
r' <- case strDecode $ B.pack queueId of
Right rId ->
getQueue ms SRecipient rId >>= \case
Right q -> unStoreIO (getMsgQueue ms q) *> action q <* closeMsgQueue q
Left AUTH -> do
logWarn $ "STORE: processQueue, queue " <> T.pack queueId <> " was removed, removing " <> T.pack dir
removeQueueDirectory_ dir
validRecipientDir dir rId >>= \case
Just True ->
getQueue ms SRecipient rId >>= \case
Right q -> unStoreIO (getMsgQueue ms q) *> action q <* closeMsgQueue q
Left AUTH -> case queueStoreType of
SMSJournal -> do
logError $ "STORE: processQueue, queue " <> T.pack queueId <> " failed to open, directory: " <> T.pack dir
exitFailure
SMSHybrid -> do
logWarn $ "STORE: processQueue, queue " <> T.pack queueId <> " was removed, removing " <> T.pack dir
removeQueueDirectory_ dir
pure mempty
Left e -> do
logError $ "STORE: processQueue, error getting queue " <> T.pack queueId <> ", " <> tshow e
exitFailure
Just False -> pure mempty
Nothing -> do
logWarn $ "STORE: processQueue, skipping unknown entity " <> T.pack queueId <> ", directory: " <> T.pack dir
pure mempty
Left e -> do
logError $ "STORE: processQueue, error getting queue " <> T.pack queueId <> ", " <> tshow e
exitFailure
Left e -> do
logError $ "STORE: processQueue, message queue directory " <> T.pack dir <> " is invalid, " <> tshow e
exitFailure
pure (i + 1, r <> r')
validRecipientDir dir qId = do
ifM
(anyExists [queueRecPath, msgQueueStatePath])
(pure $ Just True)
(ifM (anyExists [queueRefPath QRSender, queueRefPath QRNotifier]) (pure $ Just False) (pure Nothing))
where
anyExists fs =
let paths = map (\f -> f dir qId) fs
in anyM $ map doesFileExist $ paths <> map (<> ".bak") paths
progress i = "Processed: " <> show i <> " queues"
foldQueues depth f acc (queueId, path) = do
let f' = if depth == pathParts - 1 then f else foldQueues (depth + 1) f
@@ -416,7 +435,7 @@ instance JournalStoreType s => MsgStoreClass (JournalMsgStore s) where
pure $ Left AUTH
load dir f = do
-- TODO [queues] read backup if exists, remove old timed backups
qr_ <- first STORE . strDecode <$> B.readFile f
qr_ <- first STORE . strDecode <$> B.readFile f
forM qr_ $ \qr -> do
lock <- atomically $ getMapLock (queueLocks ms) rId
q <- makeQueue dir lock rId qr
@@ -427,7 +446,7 @@ instance JournalStoreType s => MsgStoreClass (JournalMsgStore s) where
where
loadQueueRef = do
let dir = msgQueueDirectory ms qId
f = queueRefPath dir qRef qId
f = queueRefPath qRef dir qId
ifM (doesFileExist f) (loadRef f) $ do
atomically $ TM.insert qId Nothing m
pure $ Left AUTH
@@ -692,7 +711,7 @@ storeQueue sq@JournalQueue {queueRec} q = do
saveQueueRef :: JournalMsgStore 'MSJournal -> QueueRef -> QueueId -> RecipientId -> TMap QueueId (Maybe RecipientId) -> IO ()
saveQueueRef st qRef qId rId m = do
let dir = msgQueueDirectory st qId
f = queueRefPath dir qRef qId
f = queueRefPath qRef dir qId
createDirectoryIfMissing True dir
safeReplaceFile f $ strEncode rId
atomically $ TM.insert qId (Just rId) m
@@ -700,8 +719,10 @@ saveQueueRef st qRef qId rId m = do
deleteQueueRef :: JournalMsgStore 'MSJournal -> QueueRef -> QueueId -> TMap QueueId (Maybe RecipientId) -> IO ()
deleteQueueRef st qRef qId m = do
let dir = msgQueueDirectory st qId
f = queueRefPath dir qRef qId
f = queueRefPath qRef dir qId
whenM (doesFileExist f) $ removeFile f
-- TODO [queues] remove folder if it's empty or has only timed backups
-- TODO [queues] remove empty parent folders up to storage depth
atomically $ TM.delete qId m
storeQueue_ :: JournalQueue s -> QueueRec -> IO ()
@@ -712,11 +733,11 @@ storeQueue_ JournalQueue {recipientId, queueDirectory} q = do
safeReplaceFile :: FilePath -> ByteString -> IO ()
safeReplaceFile f s = ifM (doesFileExist f) replace (B.writeFile f s)
where
tempBackup = f <> ".bak"
temp = f <> ".bak"
replace = do
renameFile f tempBackup
renameFile f temp
B.writeFile f s
renameFile tempBackup =<< timedBackupName f
renameFile temp =<< timedBackupName f
timedBackupName :: FilePath -> IO FilePath
timedBackupName f = do
@@ -784,8 +805,8 @@ msgQueueDirectory JournalMsgStore {config = JournalStoreConfig {storePath, pathP
queueRecPath :: FilePath -> RecipientId -> FilePath
queueRecPath dir rId = dir </> (queueRecFileName <> "." <> B.unpack (strEncode rId) <> logFileExt)
queueRefPath :: FilePath -> QueueRef -> QueueId -> FilePath
queueRefPath dir qRef qId = dir </> (queueRefFileName qRef <> "." <> B.unpack (strEncode qId) <> queueRefFileExt)
queueRefPath :: QueueRef -> FilePath -> QueueId -> FilePath
queueRefPath qRef dir qId = dir </> (queueRefFileName qRef <> "." <> B.unpack (strEncode qId) <> queueRefFileExt)
msgQueueStatePath :: FilePath -> RecipientId -> FilePath
msgQueueStatePath dir rId = dir </> (queueLogFileName <> "." <> B.unpack (strEncode rId) <> logFileExt)
@@ -1015,7 +1036,8 @@ openFile f mode = do
pure h
hClose :: Handle -> IO ()
hClose h =
hClose h = do
IO.hFlush h
IO.hClose h `catchAny` \e -> do
name <- IO.hShow h
logError $ "STORE: hClose, " <> T.pack name <> ", " <> tshow e
+1 -1
View File
@@ -127,7 +127,7 @@ a =##> p =
withTimeout :: (HasCallStack, MonadUnliftIO m) => m a -> (HasCallStack => a -> Expectation) -> m ()
withTimeout a test =
timeout 10_000000 a >>= \case
timeout 100_000000 a >>= \case
Nothing -> error "operation timed out"
Just t -> liftIO $ test t
+25 -19
View File
@@ -44,35 +44,41 @@ import Test.Hspec
msgStoreTests :: Spec
msgStoreTests = do
around (withMsgStore testSMTStoreConfig) $ describe "STM message store" someMsgStoreTests
around (withMsgStore testJournalStoreCfg) $ describe "Journal message store" $ do
someMsgStoreTests
it "should export and import journal store" testExportImportStore
describe "queue state" $ do
it "should restore queue state from the last line" testQueueState
it "should recover when message is written and state is not" testMessageState
describe "missing files" $ do
it "should create read file when missing" testReadFileMissing
it "should switch to write file when read file missing" testReadFileMissingSwitch
it "should create write file when missing" testWriteFileMissing
it "should create read file when read and write files are missing" testReadAndWriteFilesMissing
around (withMsgStore $ testJournalStoreCfg SMSHybrid) $
describe "Hybrid message store" $ do
journalMsgStoreTests
it "should export and import journal store" testExportImportStore
around (withMsgStore $ testJournalStoreCfg SMSJournal) $
describe "Journal message store" journalMsgStoreTests
where
someMsgStoreTests :: STMStoreClass s => SpecWith s
journalMsgStoreTests :: JournalStoreType s => SpecWith (JournalMsgStore s)
journalMsgStoreTests = do
someMsgStoreTests
describe "queue state" $ do
it "should restore queue state from the last line" testQueueState
it "should recover when message is written and state is not" testMessageState
describe "missing files" $ do
it "should create read file when missing" testReadFileMissing
it "should switch to write file when read file missing" testReadFileMissingSwitch
it "should create write file when missing" testWriteFileMissing
it "should create read file when read and write files are missing" testReadAndWriteFilesMissing
someMsgStoreTests :: MsgStoreClass s => SpecWith s
someMsgStoreTests = do
it "should get queue and store/read messages" testGetQueue
it "should not fail on EOF when changing read journal" testChangeReadJournal
withMsgStore :: STMStoreClass s => MsgStoreConfig s -> (s -> IO ()) -> IO ()
withMsgStore :: MsgStoreClass s => MsgStoreConfig s -> (s -> IO ()) -> IO ()
withMsgStore cfg = bracket (newMsgStore cfg) closeMsgStore
testSMTStoreConfig :: STMStoreConfig
testSMTStoreConfig = STMStoreConfig {storePath = Nothing, quota = 3}
testJournalStoreCfg :: JournalStoreConfig 'MSHybrid
testJournalStoreCfg =
testJournalStoreCfg :: SMSType s -> JournalStoreConfig s
testJournalStoreCfg queueStoreType =
JournalStoreConfig
{ storePath = testStoreMsgsDir,
pathParts = journalMsgStoreDepth,
queueStoreType = SMSHybrid,
queueStoreType,
quota = 3,
maxMsgCount = 4,
maxStateLines = 2,
@@ -115,7 +121,7 @@ testNewQueueRec g sndSecure = do
}
pure (rId, qr)
testGetQueue :: STMStoreClass s => s -> IO ()
testGetQueue :: MsgStoreClass s => s -> IO ()
testGetQueue ms = do
g <- C.newRandom
(rId, qr) <- testNewQueueRec g True
@@ -157,7 +163,7 @@ testGetQueue ms = do
(Nothing, Nothing) <- tryDelPeekMsg ms q mId8
void $ ExceptT $ deleteQueue ms q
testChangeReadJournal :: STMStoreClass s => s -> IO ()
testChangeReadJournal :: MsgStoreClass s => s -> IO ()
testChangeReadJournal ms = do
g <- C.newRandom
(rId, qr) <- testNewQueueRec g True
@@ -207,7 +213,7 @@ testExportImportStore ms = do
closeStoreLog sl
exportMessages False ms testStoreMsgsFile False
(B.readFile testStoreMsgsFile `shouldReturn`) =<< B.readFile (testStoreMsgsFile <> ".copy")
let cfg = (testJournalStoreCfg :: JournalStoreConfig 'MSHybrid) {storePath = testStoreMsgsDir2}
let cfg = (testJournalStoreCfg SMSHybrid :: JournalStoreConfig 'MSHybrid) {storePath = testStoreMsgsDir2}
ms' <- newMsgStore cfg
readWriteQueueStore testStoreLogFile ms' >>= closeStoreLog
stats@MessageStats {storedMsgsCount = 5, expiredMsgsCount = 0, storedQueues = 2} <-
+1 -1
View File
@@ -102,7 +102,7 @@ testSMPStoreLog testSuite tests =
replicateM_ 3 $ testReadWrite t
where
testReadWrite SLTC {compacted, state} = do
st <- newMsgStore testJournalStoreCfg
st <- newMsgStore $ testJournalStoreCfg SMSHybrid
l <- readWriteQueueStore testStoreLogFile st
storeState st `shouldReturn` state
closeStoreLog l
+5 -5
View File
@@ -111,7 +111,7 @@ testSMPClient_ host port vr client = do
| otherwise = Nothing
cfg :: ServerConfig
cfg = cfgMS (AMSType SMSHybrid) -- TODO [queues]
cfg = cfgMS (AMSType SMSJournal)
cfgMS :: AMSType -> ServerConfig
cfgMS msType =
@@ -190,14 +190,14 @@ proxyVRangeV8 :: VersionRangeSMP
proxyVRangeV8 = mkVersionRange batchCmdsSMPVersion sendingProxySMPVersion
withSmpServerStoreMsgLogOn :: HasCallStack => ATransport -> ServiceName -> (HasCallStack => ThreadId -> IO a) -> IO a
withSmpServerStoreMsgLogOn = (`withSmpServerStoreMsgLogOnMS` AMSType SMSHybrid) -- TODO [queues]
withSmpServerStoreMsgLogOn = (`withSmpServerStoreMsgLogOnMS` AMSType SMSJournal)
withSmpServerStoreMsgLogOnMS :: HasCallStack => ATransport -> AMSType -> ServiceName -> (HasCallStack => ThreadId -> IO a) -> IO a
withSmpServerStoreMsgLogOnMS t msType =
withSmpServerConfigOn t (cfgMS msType) {storeNtfsFile = Just testStoreNtfsFile, serverStatsBackupFile = Just testServerStatsBackupFile}
withSmpServerStoreLogOn :: HasCallStack => ATransport -> ServiceName -> (HasCallStack => ThreadId -> IO a) -> IO a
withSmpServerStoreLogOn = (`withSmpServerStoreLogOnMS` AMSType SMSHybrid) -- TODO [queues]
withSmpServerStoreLogOn = (`withSmpServerStoreLogOnMS` AMSType SMSJournal)
withSmpServerStoreLogOnMS :: HasCallStack => ATransport -> AMSType -> ServiceName -> (HasCallStack => ThreadId -> IO a) -> IO a
withSmpServerStoreLogOnMS t msType = withSmpServerConfigOn t (cfgMS msType) {serverStatsBackupFile = Just testServerStatsBackupFile}
@@ -252,7 +252,7 @@ smpServerTest ::
TProxy c ->
(Maybe TransmissionAuth, ByteString, ByteString, smp) ->
IO (Maybe TransmissionAuth, ByteString, ByteString, BrokerMsg)
smpServerTest _ t = runSmpTest (AMSType SMSHybrid) $ \h -> tPut' h t >> tGet' h -- TODO [queues]
smpServerTest _ t = runSmpTest (AMSType SMSJournal) $ \h -> tPut' h t >> tGet' h
where
tPut' :: THandleSMP c 'TClient -> (Maybe TransmissionAuth, ByteString, ByteString, smp) -> IO ()
tPut' h@THandle {params = THandleParams {sessionId, implySessId}} (sig, corrId, queueId, smp) = do
@@ -270,7 +270,7 @@ smpTestN :: (HasCallStack, Transport c) => AMSType -> Int -> (HasCallStack => [T
smpTestN msType n test' = runSmpTestN msType n test' `shouldReturn` ()
smpTest2' :: forall c. (HasCallStack, Transport c) => TProxy c -> (HasCallStack => THandleSMP c 'TClient -> THandleSMP c 'TClient -> IO ()) -> Expectation
smpTest2' = (`smpTest2` AMSType SMSHybrid) -- TODO [queues]
smpTest2' = (`smpTest2` AMSType SMSJournal)
smpTest2 :: forall c. (HasCallStack, Transport c) => TProxy c -> AMSType -> (HasCallStack => THandleSMP c 'TClient -> THandleSMP c 'TClient -> IO ()) -> Expectation
smpTest2 t msType = smpTest2Cfg (cfgMS msType) supportedClientSMPRelayVRange t
+41 -26
View File
@@ -19,7 +19,7 @@ import AgentTests.NotificationTests (removeFileIfExists)
import CoreTests.MsgStoreTests (testJournalStoreCfg)
import Control.Concurrent (ThreadId, killThread, threadDelay)
import Control.Concurrent.STM
import Control.Exception (SomeException, try)
import Control.Exception (SomeException, throwIO, try)
import Control.Monad
import Control.Monad.IO.Class
import Data.Bifunctor (first)
@@ -572,7 +572,6 @@ testWithStoreLog =
senderId1 <- newTVarIO NoEntity
senderId2 <- newTVarIO NoEntity
notifierId <- newTVarIO NoEntity
withSmpServerStoreLogOnMS at msType testPort . runTest t $ \h -> runClient t $ \h1 -> do
(sId1, rId1, rKey1, dhShared) <- createAndSecureQueue h sPub1
(rcvNtfPubDhKey, _) <- atomically $ C.generateKeyPair g
@@ -603,16 +602,14 @@ testWithStoreLog =
Resp "dabc" _ OK <- signSendRecv h rKey2 ("dabc", rId2, DEL)
pure ()
logSize testStoreLogFile `shouldReturn` 6
withHybridStore msType $
logSize testStoreLogFile `shouldReturn` 6
let cfg' = (cfgMS msType) {msgStoreType = AMSType SMSMemory, storeLogFile = Nothing, storeMsgsFile = Nothing}
withSmpServerConfigOn at cfg' testPort . runTest t $ \h -> do
sId1 <- readTVarIO senderId1
-- fails if store log is disabled
Resp "bcda" _ (ERR AUTH) <- signSendRecv h sKey1 ("bcda", sId1, _SEND "hello")
pure ()
withSmpServerStoreLogOnMS at msType testPort . runTest t $ \h -> runClient t $ \h1 -> do
-- this queue is restored
rId1 <- readTVarIO recipientId1
@@ -629,9 +626,9 @@ testWithStoreLog =
sId2 <- readTVarIO senderId2
Resp "cdab" _ (ERR AUTH) <- signSendRecv h sKey2 ("cdab", sId2, _SEND "hello too")
pure ()
logSize testStoreLogFile `shouldReturn` 1
removeFile testStoreLogFile
withHybridStore msType $ do
logSize testStoreLogFile `shouldReturn` 1
removeFile testStoreLogFile
where
runTest :: Transport c => TProxy c -> (THandleSMP c 'TClient -> IO ()) -> ThreadId -> Expectation
runTest _ test' server = do
@@ -641,11 +638,20 @@ testWithStoreLog =
runClient :: Transport c => TProxy c -> (THandleSMP c 'TClient -> IO ()) -> Expectation
runClient _ test' = testSMPClient test' `shouldReturn` ()
withHybridStore :: AMSType -> IO () -> IO ()
withHybridStore msType a = case msType of
AMSType SMSHybrid -> a
_ -> pure ()
logSize :: FilePath -> IO Int
logSize f =
try (length . B.lines <$> B.readFile f) >>= \case
Right l -> pure l
Left (_ :: SomeException) -> logSize f
logSize f = go (3 :: Int)
where
go n =
try (length . B.lines <$> B.readFile f) >>= \case
Right l -> pure l
Left (e :: SomeException)
| n == 0 -> throwIO e
| otherwise -> threadDelay 100000 >> go (n - 1)
testRestoreMessages :: SpecWith (ATransport, AMSType)
testRestoreMessages =
@@ -685,7 +691,8 @@ testRestoreMessages =
rId <- readTVarIO recipientId
logSize testStoreLogFile `shouldReturn` 2
withHybridStore msType $
logSize testStoreLogFile `shouldReturn` 2
-- logSize testStoreMsgsFile `shouldReturn` 5
logSize testServerStatsBackupFile `shouldReturn` 74
Right stats1 <- strDecode <$> B.readFile testServerStatsBackupFile
@@ -702,9 +709,10 @@ testRestoreMessages =
Resp "4" _ (Msg mId4 msg4) <- signSendRecv h rKey ("4", rId, ACK mId3)
(dec mId4 msg4, Right "hello 4") #== "restored message delivered"
logSize testStoreLogFile `shouldReturn` 1
-- the last message is not removed because it was not ACK'd
-- logSize testStoreMsgsFile `shouldReturn` 3
withHybridStore msType $
logSize testStoreLogFile `shouldReturn` 1
-- the last message is not removed because it was not ACK'd
-- logSize testStoreMsgsFile `shouldReturn` 3
logSize testServerStatsBackupFile `shouldReturn` 74
Right stats2 <- strDecode <$> B.readFile testServerStatsBackupFile
checkStats stats2 [rId] 5 3
@@ -721,13 +729,14 @@ testRestoreMessages =
(dec mId6 msg6, Left "ClientRcvMsgQuota") #== "restored message delivered"
Resp "7" _ OK <- signSendRecv h rKey ("7", rId, ACK mId6)
pure ()
logSize testStoreLogFile `shouldReturn` 1
-- logSize testStoreMsgsFile `shouldReturn` 0
withHybridStore msType $
logSize testStoreLogFile `shouldReturn` 1
-- logSize testStoreMsgsFile `shouldReturn` 0
logSize testServerStatsBackupFile `shouldReturn` 74
Right stats3 <- strDecode <$> B.readFile testServerStatsBackupFile
checkStats stats3 [rId] 5 5
removeFile testStoreLogFile
withHybridStore msType $ removeFile testStoreLogFile
removeFileIfExists testStoreMsgsFile
whenM (doesDirectoryExist testStoreMsgsDir) $ removeDirectoryRecursive testStoreMsgsDir
removeFile testServerStatsBackupFile
@@ -782,7 +791,8 @@ testRestoreExpireMessages =
Resp "4" _ OK <- signSendRecv h sKey ("4", sId, _SEND "hello 4")
pure ()
logSize testStoreLogFile `shouldReturn` 2
withHybridStore msType $
logSize testStoreLogFile `shouldReturn` 2
exportStoreMessages msType
msgs <- B.readFile testStoreMsgsFile
length (B.lines msgs) `shouldBe` 4
@@ -791,7 +801,8 @@ testRestoreExpireMessages =
cfg1 = (cfgMS msType) {messageExpiration = expCfg1, serverStatsBackupFile = Just testServerStatsBackupFile}
withSmpServerConfigOn at cfg1 testPort . runTest t $ \_ -> pure ()
logSize testStoreLogFile `shouldReturn` 1
withHybridStore msType $
logSize testStoreLogFile `shouldReturn` 1
exportStoreMessages msType
msgs' <- B.readFile testStoreMsgsFile
msgs' `shouldBe` msgs
@@ -800,7 +811,8 @@ testRestoreExpireMessages =
cfg2 = (cfgMS msType) {messageExpiration = expCfg2, serverStatsBackupFile = Just testServerStatsBackupFile}
withSmpServerConfigOn at cfg2 testPort . runTest t $ \_ -> pure ()
logSize testStoreLogFile `shouldReturn` 1
withHybridStore msType $
logSize testStoreLogFile `shouldReturn` 1
-- two messages expired
exportStoreMessages msType
msgs'' <- B.readFile testStoreMsgsFile
@@ -810,10 +822,13 @@ testRestoreExpireMessages =
_msgExpired `shouldBe` 2
where
exportStoreMessages :: AMSType -> IO ()
exportStoreMessages = \case
AMSType SMSJournal -> undefined -- TODO [queues]
exportStoreMessages msType = case msType of
AMSType SMSJournal -> do
ms <- newMsgStore $ (testJournalStoreCfg SMSJournal) {quota = 4}
removeFileIfExists testStoreMsgsFile
exportMessages False ms testStoreMsgsFile False
AMSType SMSHybrid -> do
ms <- newMsgStore testJournalStoreCfg {quota = 4}
ms <- newMsgStore $ (testJournalStoreCfg SMSHybrid) {quota = 4}
readWriteQueueStore testStoreLogFile ms >>= closeStoreLog
removeFileIfExists testStoreMsgsFile
exportMessages False ms testStoreMsgsFile False
+1 -1
View File
@@ -64,7 +64,7 @@ main = do
describe "SMP server via TLS, hybrid store" $ do
describe "SMP syntax" $ serverSyntaxTests (transport @TLS)
before (pure (transport @TLS, AMSType SMSHybrid)) serverTests
fdescribe "SMP server via TLS, journal message store" $ do
describe "SMP server via TLS, journal message store" $ do
before (pure (transport @TLS, AMSType SMSJournal)) serverTests
describe "SMP server via TLS, memory message store" $
before (pure (transport @TLS, AMSType SMSMemory)) serverTests