mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-06-04 08:41:25 +00:00
concurrent read/write
This commit is contained in:
@@ -44,6 +44,7 @@ module Simplex.Messaging.Server
|
||||
)
|
||||
where
|
||||
|
||||
import Control.Concurrent.Async (concurrently)
|
||||
import Control.Concurrent.STM (throwSTM)
|
||||
import Control.Logger.Simple
|
||||
import Control.Monad
|
||||
@@ -74,7 +75,6 @@ import qualified Data.List.NonEmpty as L
|
||||
import Data.Map.Strict (Map)
|
||||
import qualified Data.Map.Strict as M
|
||||
import Data.Maybe (fromMaybe, isJust, isNothing)
|
||||
import Data.Semigroup (Sum (..))
|
||||
import qualified Data.Set as S
|
||||
import Data.Text (Text)
|
||||
import qualified Data.Text as T
|
||||
@@ -2114,10 +2114,22 @@ exportMessages :: forall s. MsgStoreClass s => Bool -> MsgStore s -> FilePath ->
|
||||
exportMessages tty st f drainMsgs = do
|
||||
logNote $ "saving messages to file " <> T.pack f
|
||||
run $ case st of
|
||||
StoreMemory ms -> exportMessages_ ms $ getMsgs ms
|
||||
StoreJournal ms -> exportMessages_ ms $ getJournalMsgs ms
|
||||
StoreMemory ms -> exportMessages_ getMsgs ms
|
||||
StoreJournal ms -> exportMessages_ getJournalMsgs ms
|
||||
where
|
||||
exportMessages_ ms get = fmap (\(Sum n) -> n) . unsafeWithAllMsgQueues tty ms . saveQueueMsgs get
|
||||
exportMessages_ get ms h = do
|
||||
saveQ <- newTBQueueIO 100
|
||||
let readMsgs = do
|
||||
unsafeWithAllMsgQueues tty ms $ \q ->
|
||||
get ms q >>= \msgs -> unless (null msgs) $ atomically $ writeTBQueue saveQ $ Just (q, msgs)
|
||||
atomically $ writeTBQueue saveQ Nothing
|
||||
writeMsgs n =
|
||||
atomically (readTBQueue saveQ) >>= \case
|
||||
Nothing -> pure n
|
||||
Just (q, msgs) -> do
|
||||
n' <- saveQueueMsgs h q msgs
|
||||
writeMsgs $ n + n'
|
||||
snd <$> concurrently readMsgs (writeMsgs 0)
|
||||
run :: (Handle -> IO Int) -> IO ()
|
||||
run a = liftIO $ withFile f WriteMode $ tryAny . a >=> \case
|
||||
Right n -> logNote $ "messages saved: " <> tshow n
|
||||
@@ -2130,11 +2142,10 @@ exportMessages tty st f drainMsgs = do
|
||||
Nothing -> getJournalQueueMessages ms q
|
||||
getMsgs :: MsgStoreClass s' => s' -> StoreQueue s' -> IO [Message]
|
||||
getMsgs ms q = unsafeRunStore q "saveQueueMsgs" $ getQueueMessages_ drainMsgs q =<< getMsgQueue ms q False
|
||||
saveQueueMsgs :: (StoreQueue s -> IO [Message]) -> Handle -> StoreQueue s -> IO (Sum Int)
|
||||
saveQueueMsgs get h q = do
|
||||
msgs <- get q
|
||||
saveQueueMsgs :: Handle -> StoreQueue s -> [Message] -> IO Int
|
||||
saveQueueMsgs h q msgs = do
|
||||
unless (null msgs) $ BLD.hPutBuilder h $ encodeMessages (recipientId q) msgs
|
||||
pure $ Sum $ length msgs
|
||||
pure $ length msgs
|
||||
encodeMessages rId = mconcat . map (\msg -> BLD.byteString (strEncode $ MLRv3 rId msg) <> BLD.char8 '\n')
|
||||
|
||||
processServerMessages :: forall s'. StartOptions -> M s' (Maybe MessageStats)
|
||||
|
||||
@@ -54,7 +54,7 @@ msgStoreTests = do
|
||||
around (withMsgStore testSMTStoreConfig) $ describe "STM message store" someMsgStoreTests
|
||||
around (withMsgStore $ testJournalStoreCfg MQStoreCfg) $ describe "Journal message store" $ do
|
||||
someMsgStoreTests
|
||||
it "should export and import journal store" testExportImportStore
|
||||
fit "should export and import journal store" testExportImportStore
|
||||
describe "queue state" $ do
|
||||
it "should restore queue state from the last line" testQueueState
|
||||
it "should recover when message is written and state is not" testMessageState
|
||||
|
||||
Reference in New Issue
Block a user