servers: read stores by chunks (#1080)

This commit is contained in:
Alexander Bondarenko
2024-04-01 21:42:30 +03:00
committed by GitHub
parent bfd532e833
commit be9a84905f
4 changed files with 13 additions and 8 deletions
+2 -1
View File
@@ -22,6 +22,7 @@ import Control.Concurrent.STM
import Control.Monad.Except
import qualified Data.Attoparsec.ByteString.Char8 as A
import qualified Data.ByteString.Char8 as B
import qualified Data.ByteString.Lazy.Char8 as LB
import Data.Composition ((.:), (.:.))
import Data.List.NonEmpty (NonEmpty)
import qualified Data.List.NonEmpty as L
@@ -88,7 +89,7 @@ readWriteFileStore f st = do
pure s
readFileStore :: FilePath -> FileStore -> IO ()
readFileStore f st = mapM_ addFileLogRecord . B.lines =<< B.readFile f
readFileStore f st = mapM_ (addFileLogRecord . LB.toStrict) . LB.lines =<< LB.readFile f
where
addFileLogRecord s = case strDecode s of
Left e -> B.putStrLn $ "Log parsing error (" <> B.pack e <> "): " <> B.take 100 s
@@ -27,6 +27,7 @@ import Control.Concurrent.STM
import Control.Monad
import qualified Data.Attoparsec.ByteString.Char8 as A
import qualified Data.ByteString.Char8 as B
import qualified Data.ByteString.Lazy.Char8 as LB
import Data.Word (Word16)
import qualified Simplex.Messaging.Crypto as C
import Simplex.Messaging.Encoding.String
@@ -189,7 +190,7 @@ readWriteNtfStore f st = do
pure s
readNtfStore :: FilePath -> NtfStore -> IO ()
readNtfStore f st = mapM_ addNtfLogRecord . B.lines =<< B.readFile f
readNtfStore f st = mapM_ (addNtfLogRecord . LB.toStrict) . LB.lines =<< LB.readFile f
where
addNtfLogRecord s = case strDecode s of
Left e -> B.putStrLn $ "Log parsing error (" <> B.pack e <> "): " <> B.take 100 s
+4 -2
View File
@@ -47,6 +47,7 @@ import Crypto.Random
import Data.Bifunctor (first)
import Data.ByteString.Char8 (ByteString)
import qualified Data.ByteString.Char8 as B
import qualified Data.ByteString.Lazy.Char8 as LB
import Data.Either (fromRight, partitionEithers)
import Data.Functor (($>))
import Data.Int (Int64)
@@ -983,7 +984,7 @@ restoreServerMessages = asks (storeMsgsFile . config) >>= \case
ms <- asks msgStore
quota <- asks $ msgQueueQuota . config
old_ <- asks (messageExpiration . config) $>>= (liftIO . fmap Just . expireBeforeEpoch)
runExceptT (liftIO (B.readFile f) >>= foldM (\expired -> restoreMsg expired ms quota old_) 0 . B.lines) >>= \case
runExceptT (liftIO (LB.readFile f) >>= foldM (\expired -> restoreMsg expired ms quota old_) 0 . LB.lines) >>= \case
Left e -> do
logError . T.pack $ "error restoring messages: " <> e
liftIO exitFailure
@@ -992,10 +993,11 @@ restoreServerMessages = asks (storeMsgsFile . config) >>= \case
logInfo "messages restored"
pure expired
where
restoreMsg !expired ms quota old_ s = do
restoreMsg !expired ms quota old_ s' = do
MLRv3 rId msg <- liftEither . first (msgErr "parsing") $ strDecode s
addToMsgQueue rId msg
where
s = LB.toStrict s'
addToMsgQueue rId msg = do
(isExpired, logFull) <- atomically $ do
q <- getMsgQueue ms rId quota
+5 -4
View File
@@ -25,8 +25,8 @@ where
import Control.Applicative (optional, (<|>))
import Control.Monad (foldM, unless, when)
import Data.ByteString.Char8 (ByteString)
import qualified Data.ByteString.Char8 as B
import qualified Data.ByteString.Lazy.Char8 as LB
import Data.Functor (($>))
import Data.Map.Strict (Map)
import qualified Data.Map.Strict as M
@@ -148,13 +148,14 @@ writeQueues s = mapM_ $ \q -> when (active q) $ logCreateQueue s q
active QueueRec {status} = status == QueueActive
readQueues :: FilePath -> IO (Map RecipientId QueueRec)
readQueues f = foldM processLine M.empty . B.lines =<< B.readFile f
readQueues f = foldM processLine M.empty . LB.lines =<< LB.readFile f
where
processLine :: Map RecipientId QueueRec -> ByteString -> IO (Map RecipientId QueueRec)
processLine m s = case strDecode $ trimCR s of
processLine :: Map RecipientId QueueRec -> LB.ByteString -> IO (Map RecipientId QueueRec)
processLine m s' = case strDecode $ trimCR s of
Right r -> pure $ procLogRecord r
Left e -> printError e $> m
where
s = LB.toStrict s'
procLogRecord :: StoreLogRecord -> Map RecipientId QueueRec
procLogRecord = \case
CreateQueue q -> M.insert (recipientId q) q m