From be9a84905f7df31e3512f7844d95dbdfce1986cf Mon Sep 17 00:00:00 2001 From: Alexander Bondarenko <486682+dpwiz@users.noreply.github.com> Date: Mon, 1 Apr 2024 21:42:30 +0300 Subject: [PATCH] servers: read stores by chunks (#1080) --- src/Simplex/FileTransfer/Server/StoreLog.hs | 3 ++- src/Simplex/Messaging/Notifications/Server/StoreLog.hs | 3 ++- src/Simplex/Messaging/Server.hs | 6 ++++-- src/Simplex/Messaging/Server/StoreLog.hs | 9 +++++---- 4 files changed, 13 insertions(+), 8 deletions(-) diff --git a/src/Simplex/FileTransfer/Server/StoreLog.hs b/src/Simplex/FileTransfer/Server/StoreLog.hs index f2ee311b7..8e8add3d6 100644 --- a/src/Simplex/FileTransfer/Server/StoreLog.hs +++ b/src/Simplex/FileTransfer/Server/StoreLog.hs @@ -22,6 +22,7 @@ import Control.Concurrent.STM import Control.Monad.Except import qualified Data.Attoparsec.ByteString.Char8 as A import qualified Data.ByteString.Char8 as B +import qualified Data.ByteString.Lazy.Char8 as LB import Data.Composition ((.:), (.:.)) import Data.List.NonEmpty (NonEmpty) import qualified Data.List.NonEmpty as L @@ -88,7 +89,7 @@ readWriteFileStore f st = do pure s readFileStore :: FilePath -> FileStore -> IO () -readFileStore f st = mapM_ addFileLogRecord . B.lines =<< B.readFile f +readFileStore f st = mapM_ (addFileLogRecord . LB.toStrict) . LB.lines =<< LB.readFile f where addFileLogRecord s = case strDecode s of Left e -> B.putStrLn $ "Log parsing error (" <> B.pack e <> "): " <> B.take 100 s diff --git a/src/Simplex/Messaging/Notifications/Server/StoreLog.hs b/src/Simplex/Messaging/Notifications/Server/StoreLog.hs index cc2a1802e..1f206cecc 100644 --- a/src/Simplex/Messaging/Notifications/Server/StoreLog.hs +++ b/src/Simplex/Messaging/Notifications/Server/StoreLog.hs @@ -27,6 +27,7 @@ import Control.Concurrent.STM import Control.Monad import qualified Data.Attoparsec.ByteString.Char8 as A import qualified Data.ByteString.Char8 as B +import qualified Data.ByteString.Lazy.Char8 as LB import Data.Word (Word16) import qualified Simplex.Messaging.Crypto as C import Simplex.Messaging.Encoding.String @@ -189,7 +190,7 @@ readWriteNtfStore f st = do pure s readNtfStore :: FilePath -> NtfStore -> IO () -readNtfStore f st = mapM_ addNtfLogRecord . B.lines =<< B.readFile f +readNtfStore f st = mapM_ (addNtfLogRecord . LB.toStrict) . LB.lines =<< LB.readFile f where addNtfLogRecord s = case strDecode s of Left e -> B.putStrLn $ "Log parsing error (" <> B.pack e <> "): " <> B.take 100 s diff --git a/src/Simplex/Messaging/Server.hs b/src/Simplex/Messaging/Server.hs index 0299bebfc..ca99e87bb 100644 --- a/src/Simplex/Messaging/Server.hs +++ b/src/Simplex/Messaging/Server.hs @@ -47,6 +47,7 @@ import Crypto.Random import Data.Bifunctor (first) import Data.ByteString.Char8 (ByteString) import qualified Data.ByteString.Char8 as B +import qualified Data.ByteString.Lazy.Char8 as LB import Data.Either (fromRight, partitionEithers) import Data.Functor (($>)) import Data.Int (Int64) @@ -983,7 +984,7 @@ restoreServerMessages = asks (storeMsgsFile . config) >>= \case ms <- asks msgStore quota <- asks $ msgQueueQuota . config old_ <- asks (messageExpiration . config) $>>= (liftIO . fmap Just . expireBeforeEpoch) - runExceptT (liftIO (B.readFile f) >>= foldM (\expired -> restoreMsg expired ms quota old_) 0 . B.lines) >>= \case + runExceptT (liftIO (LB.readFile f) >>= foldM (\expired -> restoreMsg expired ms quota old_) 0 . LB.lines) >>= \case Left e -> do logError . T.pack $ "error restoring messages: " <> e liftIO exitFailure @@ -992,10 +993,11 @@ restoreServerMessages = asks (storeMsgsFile . config) >>= \case logInfo "messages restored" pure expired where - restoreMsg !expired ms quota old_ s = do + restoreMsg !expired ms quota old_ s' = do MLRv3 rId msg <- liftEither . first (msgErr "parsing") $ strDecode s addToMsgQueue rId msg where + s = LB.toStrict s' addToMsgQueue rId msg = do (isExpired, logFull) <- atomically $ do q <- getMsgQueue ms rId quota diff --git a/src/Simplex/Messaging/Server/StoreLog.hs b/src/Simplex/Messaging/Server/StoreLog.hs index b3f5486b8..b1011c404 100644 --- a/src/Simplex/Messaging/Server/StoreLog.hs +++ b/src/Simplex/Messaging/Server/StoreLog.hs @@ -25,8 +25,8 @@ where import Control.Applicative (optional, (<|>)) import Control.Monad (foldM, unless, when) -import Data.ByteString.Char8 (ByteString) import qualified Data.ByteString.Char8 as B +import qualified Data.ByteString.Lazy.Char8 as LB import Data.Functor (($>)) import Data.Map.Strict (Map) import qualified Data.Map.Strict as M @@ -148,13 +148,14 @@ writeQueues s = mapM_ $ \q -> when (active q) $ logCreateQueue s q active QueueRec {status} = status == QueueActive readQueues :: FilePath -> IO (Map RecipientId QueueRec) -readQueues f = foldM processLine M.empty . B.lines =<< B.readFile f +readQueues f = foldM processLine M.empty . LB.lines =<< LB.readFile f where - processLine :: Map RecipientId QueueRec -> ByteString -> IO (Map RecipientId QueueRec) - processLine m s = case strDecode $ trimCR s of + processLine :: Map RecipientId QueueRec -> LB.ByteString -> IO (Map RecipientId QueueRec) + processLine m s' = case strDecode $ trimCR s of Right r -> pure $ procLogRecord r Left e -> printError e $> m where + s = LB.toStrict s' procLogRecord :: StoreLogRecord -> Map RecipientId QueueRec procLogRecord = \case CreateQueue q -> M.insert (recipientId q) q m