another approach

This commit is contained in:
Evgeny Poberezkin
2025-09-09 08:13:26 +01:00
parent 35ac0cbd51
commit 4bea527557
5 changed files with 77 additions and 21 deletions
+18 -8
View File
@@ -105,7 +105,7 @@ import Simplex.Messaging.Server.Control
import Simplex.Messaging.Server.Env.STM as Env
import Simplex.Messaging.Server.Expiration
import Simplex.Messaging.Server.MsgStore
import Simplex.Messaging.Server.MsgStore.Journal (JournalMsgStore, JournalQueue)
import Simplex.Messaging.Server.MsgStore.Journal (JournalMsgStore, JournalQueue, getJournalQueueMessages)
import Simplex.Messaging.Server.MsgStore.STM
import Simplex.Messaging.Server.MsgStore.Types
import Simplex.Messaging.Server.NtfStore
@@ -2103,26 +2103,36 @@ randomId = fmap EntityId . randomId'
{-# INLINE randomId #-}
saveServerMessages :: Bool -> MsgStore s -> IO ()
saveServerMessages drainMsgs = \case
StoreMemory ms@STMMsgStore {storeConfig = STMStoreConfig {storePath}} -> case storePath of
saveServerMessages drainMsgs ms = case ms of
StoreMemory STMMsgStore {storeConfig = STMStoreConfig {storePath}} -> case storePath of
Just f -> exportMessages False ms f drainMsgs
Nothing -> logNote "undelivered messages are not saved"
StoreJournal _ -> logNote "closed journal message storage"
exportMessages :: MsgStoreClass s => Bool -> s -> FilePath -> Bool -> IO ()
exportMessages :: forall s. MsgStoreClass s => Bool -> MsgStore s -> FilePath -> Bool -> IO ()
exportMessages tty ms f drainMsgs = do
logNote $ "saving messages to file " <> T.pack f
liftIO $ withFile f WriteMode $ \h ->
tryAny (unsafeWithAllMsgQueues tty True ms $ saveQueueMsgs h) >>= \case
tryAny (unsafeWithAllMsgQueues tty False ms' $ saveQueueMsgs h) >>= \case
Right (Sum total) -> logNote $ "messages saved: " <> tshow total
Left e -> do
logError $ "error exporting messages: " <> tshow e
exitFailure
where
saveQueueMsgs h q = do
msgs <-
ms' :: s
ms' = case ms of
StoreMemory s -> s
StoreJournal s -> s
getMessages q = case ms of
StoreMemory _ ->
unsafeRunStore q "saveQueueMsgs" $
getQueueMessages_ drainMsgs q =<< getMsgQueue ms q False
getQueueMessages_ drainMsgs q =<< getMsgQueue ms' q False
StoreJournal _ ->
unsafeRunStore q "saveQueueMsgs" $
getQueueMessages_ drainMsgs q =<< getMsgQueue ms' q False
-- getJournalQueueMessages ms' q
saveQueueMsgs h q = do
msgs <- getMessages q
unless (null msgs) $ BLD.hPutBuilder h $ encodeMessages (recipientId q) msgs
pure $ Sum $ length msgs
encodeMessages rId = mconcat . map (\msg -> BLD.byteString (strEncode $ MLRv3 rId msg) <> BLD.char8 '\n')
+2 -3
View File
@@ -145,12 +145,11 @@ smpServerCLI_ generateSite serveStaticFiles attachStaticFiles cfgPath logPath =
completedExport
| otherwise -> do
confirmExport
let msType = readStoreType ini
case readStoreType ini of
Right (ASType SQSMemory _) -> do
ms <- newJournalMsgStore logPath MQStoreCfg
readQueueStore True (mkQueue ms False) storeLogFile $ stmQueueStore ms
exportMessages True ms storeMsgsFilePath False
exportMessages True (StoreJournal ms) storeMsgsFilePath False
Right (ASType SQSPostgres SMSJournal) -> do
#if defined(dbServerPostgres)
let dbStoreLogPath = enableDbStoreLog' ini $> storeLogFilePath
@@ -159,7 +158,7 @@ smpServerCLI_ generateSite serveStaticFiles attachStaticFiles cfgPath logPath =
putStrLn $ "Schema " <> B.unpack schema <> " does not exist in PostrgreSQL database: " <> B.unpack connstr
exitFailure
ms <- newJournalMsgStore logPath $ PQStoreCfg PostgresStoreCfg {dbOpts, dbStoreLogPath, confirmMigrations = MCYesUp, deletedTTL = iniDeletedTTL ini}
exportMessages True ms storeMsgsFilePath False
exportMessages True (StoreJournal ms) storeMsgsFilePath False
#else
noPostgresExit
#endif
@@ -48,6 +48,7 @@ module Simplex.Messaging.Server.MsgStore.Journal
postgresQueueStore,
#endif
exportJournalMessages,
getJournalQueueMessages,
encodeMessages,
)
where
@@ -569,8 +570,9 @@ instance MsgStoreClass (JournalMsgStore s) where
where
getSize = maybe (pure (-1)) (fmap size . readTVarIO . state)
-- drainMsgs is never True with Journal storage
getQueueMessages_ :: Bool -> JournalQueue s -> JournalMsgQueue s -> StoreIO s [Message]
getQueueMessages_ drainMsgs q' q = StoreIO (run [])
getQueueMessages_ drainMsgs q' q = StoreIO $ if drainMsgs then run [] else readTVarIO (state q) >>= runFast
where
run msgs = readTVarIO (handles q) >>= maybe (pure []) (getMsg msgs)
getMsg msgs hs = chooseReadJournal q' q drainMsgs hs >>= maybe (pure msgs) readMsg
@@ -579,6 +581,23 @@ instance MsgStoreClass (JournalMsgStore s) where
(msg, len) <- hGetMsgAt h $ bytePos rs
updateReadPos q' q drainMsgs len hs
(msg :) <$> run msgs
runFast MsgQueueState {writeState = ws, readState = rs, size}
| size > 0 =
readTVarIO (handles q) >>= \case
Just (MsgQueueHandles _ rh wh_) -> case wh_ of
Just wh -> (++) <$> getAllMsgs rh (bytePos rs) (byteCount rs - bytePos rs) <*> getAllMsgs wh 0 (bytePos ws)
_ -> getAllMsgs rh (bytePos rs) (bytePos ws - bytePos rs)
Nothing -> pure []
| otherwise = pure []
getAllMsgs h pos sz
| sz > 0 = do
IO.hSeek h AbsoluteSeek $ fromIntegral pos
parseMsgs =<< B.hGet h (fromIntegral sz)
| otherwise = pure []
parseMsgs s = do
let (errs, msgs) = partitionEithers $ map strDecode $ B.lines s
unless (null errs) $ putStrLn $ "Error reading " <> show (length errs) <> " messages from " <> B.unpack (strEncode $ recipientId' q')
pure msgs
writeMsg :: JournalMsgStore s -> JournalQueue s -> Bool -> Message -> ExceptT ErrorType IO (Maybe (Message, Bool))
writeMsg ms q' logState msg = isolateQueue q' "writeMsg" $ do
@@ -1103,5 +1122,33 @@ exportJournalMessages tty ms@JournalMsgStore {config} h = ifM (doesDirectoryExis
(pure $ Just (queueId', path'))
(Nothing <$ putStrLn ("Error: path " <> path' <> " is not a directory, skipping"))
getJournalQueueMessages :: JournalMsgStore s -> JournalQueue s -> IO [Message]
getJournalQueueMessages ms q = do
let rId = recipientId' q
dir = msgQueueDirectory ms rId
statePath = msgQueueStatePath dir $ B.unpack (strEncode rId)
readQueueState ms statePath >>= \case
(Just MsgQueueState {readState = rs, writeState = ws, size}, _)
| size == 0 -> pure []
| journalId rs == journalId ws -> do
let f = journalFilePath dir $ journalId rs
s <- B.readFile f
parseMsgs f $ B.take (bytePos' ws - bytePos' rs) $ B.drop (bytePos' rs) s
| otherwise -> do
let rf = journalFilePath dir $ journalId rs
wf = journalFilePath dir $ journalId ws
r <- B.readFile rf
w <- B.readFile wf
rMsgs <- parseMsgs rf $ B.take (fromIntegral $ byteCount rs) $ B.drop (bytePos' rs) r
wMsgs <- parseMsgs wf $ B.take (bytePos' ws) w
pure $ rMsgs ++ wMsgs
_ -> pure []
where
bytePos' = fromIntegral . bytePos
parseMsgs f s = do
let (errs, msgs) = partitionEithers $ map strDecode $ B.lines s
unless (null errs) $ putStrLn $ "Error reading " <> show (length errs) <> " messages from " <> f
pure msgs
encodeMessages :: RecipientId -> [Message] -> BLD.Builder
encodeMessages rId = mconcat . map (\msg -> BLD.byteString (strEncode $ MLRv3 rId msg) <> BLD.char8 '\n')
+7 -7
View File
@@ -35,7 +35,7 @@ import Simplex.Messaging.Crypto (pattern MaxLenBS)
import qualified Simplex.Messaging.Crypto as C
import Simplex.Messaging.Protocol (EntityId (..), LinkId, Message (..), QueueLinkData, RecipientId, SParty (..), noMsgFlags)
import Simplex.Messaging.Server (exportMessages, importMessages, printMessageStats)
import Simplex.Messaging.Server.Env.STM (journalMsgStoreDepth, readWriteQueueStore)
import Simplex.Messaging.Server.Env.STM (MsgStore (..), journalMsgStoreDepth, readWriteQueueStore)
import Simplex.Messaging.Server.Expiration (ExpirationConfig (..), expireBeforeEpoch)
import Simplex.Messaging.Server.MsgStore.Journal
import Simplex.Messaging.Server.MsgStore.STM
@@ -55,7 +55,7 @@ msgStoreTests = do
around (withMsgStore testSMTStoreConfig) $ describe "STM message store" someMsgStoreTests
around (withMsgStore $ testJournalStoreCfg MQStoreCfg) $ describe "Journal message store" $ do
someMsgStoreTests
it "should export and import journal store" testExportImportStore
fit "should export and import journal store" testExportImportStore
describe "queue state" $ do
it "should restore queue state from the last line" testQueueState
it "should recover when message is written and state is not" testMessageState
@@ -226,8 +226,8 @@ testExportImportStore ms = do
pure ()
length <$> listDirectory (msgQueueDirectory ms rId1) `shouldReturn` 2
length <$> listDirectory (msgQueueDirectory ms rId2) `shouldReturn` 3
exportMessages False ms testStoreMsgsFile False
testFastExport ms testStoreMsgsFile
exportMessages False (StoreJournal ms) testStoreMsgsFile False
-- testFastExport ms testStoreMsgsFile
closeMsgStore ms
closeStoreLog sl
let cfg = (testJournalStoreCfg MQStoreCfg :: JournalStoreConfig 'QSMemory) {storePath = testStoreMsgsDir2}
@@ -238,14 +238,14 @@ testExportImportStore ms = do
printMessageStats "Messages" stats
length <$> listDirectory (msgQueueDirectory ms rId1) `shouldReturn` 2
length <$> listDirectory (msgQueueDirectory ms rId2) `shouldReturn` 3 -- 2 message files
exportMessages False ms' testStoreMsgsFile2 False
testFastExport ms' testStoreMsgsFile2
exportMessages False (StoreJournal ms') testStoreMsgsFile2 False
-- testFastExport ms' testStoreMsgsFile2
(B.readFile testStoreMsgsFile2 `shouldReturn`) =<< B.readFile (testStoreMsgsFile <> ".bak")
stmStore <- newMsgStore testSMTStoreConfig
readWriteQueueStore True (mkQueue stmStore True) testStoreLogFile (queueStore stmStore) >>= closeStoreLog
MessageStats {storedMsgsCount = 5, expiredMsgsCount = 0, storedQueues = 2} <-
importMessages False stmStore testStoreMsgsFile2 Nothing False
exportMessages False stmStore testStoreMsgsFile False
exportMessages False (StoreMemory stmStore) testStoreMsgsFile False
(B.sort <$> B.readFile testStoreMsgsFile `shouldReturn`) =<< (B.sort <$> B.readFile (testStoreMsgsFile2 <> ".bak"))
where
testFastExport ms' f = do
+2 -2
View File
@@ -42,7 +42,7 @@ import Simplex.Messaging.Encoding.String
import Simplex.Messaging.Parsers (parseAll, parseString)
import Simplex.Messaging.Protocol
import Simplex.Messaging.Server (exportMessages)
import Simplex.Messaging.Server.Env.STM (AStoreType (..), ServerConfig (..), ServerStoreCfg (..), readWriteQueueStore)
import Simplex.Messaging.Server.Env.STM (AStoreType (..), MsgStore (..), ServerConfig (..), ServerStoreCfg (..), readWriteQueueStore)
import Simplex.Messaging.Server.Expiration
import Simplex.Messaging.Server.MsgStore.Journal (JournalStoreConfig (..), QStoreCfg (..), stmQueueStore)
import Simplex.Messaging.Server.MsgStore.Types (MsgStoreClass (..), SMSType (..), SQSType (..), newMsgStore)
@@ -921,7 +921,7 @@ testRestoreExpireMessages =
ms <- newMsgStore (testJournalStoreCfg MQStoreCfg) {quota = 4}
readWriteQueueStore True (mkQueue ms True) testStoreLogFile (stmQueueStore ms) >>= closeStoreLog
removeFileIfExists testStoreMsgsFile
exportMessages False ms testStoreMsgsFile False
exportMessages False (StoreJournal ms) testStoreMsgsFile False
closeMsgStore ms
runTest :: Transport c => TProxy c 'TServer -> (THandleSMP c 'TClient -> IO ()) -> ThreadId -> Expectation
runTest _ test' server = do