mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-06-06 22:01:54 +00:00
refactor: stream export instead of loading recipients into memory
This commit is contained in:
@@ -277,37 +277,45 @@ exportFileStore :: FilePath -> PostgresFileStoreCfg -> IO ()
|
||||
exportFileStore storeLogFilePath dbCfg = do
|
||||
pgStore <- newFileStore dbCfg :: IO PostgresFileStore
|
||||
sl <- openWriteStoreLog False storeLogFilePath
|
||||
-- Fold 1: stream files, write FNEW + FPUT per file
|
||||
putStrLn "Exporting files..."
|
||||
-- Load all recipients into a map for lookup
|
||||
rcpMap <- withTransaction (dbStore pgStore) $ \db ->
|
||||
DB.fold_
|
||||
db
|
||||
"SELECT recipient_id, sender_id, recipient_key FROM recipients ORDER BY sender_id"
|
||||
M.empty
|
||||
(\acc (rId, sId, rKeyBs :: ByteString) ->
|
||||
case C.decodePubKey rKeyBs of
|
||||
Right rKey -> pure $! M.insertWith (++) sId [FileRecipient rId rKey] acc
|
||||
Left _ -> putStrLn ("WARNING: invalid recipient key for " <> show rId) $> acc)
|
||||
-- Fold over files, writing StoreLog records
|
||||
(!fCnt, !rCnt) <- withTransaction (dbStore pgStore) $ \db ->
|
||||
!fCnt <- withTransaction (dbStore pgStore) $ \db ->
|
||||
DB.fold_
|
||||
db
|
||||
"SELECT sender_id, file_size, file_digest, sender_key, file_path, created_at, status FROM files ORDER BY created_at"
|
||||
(0 :: Int, 0 :: Int)
|
||||
( \(!fc, !rc) (sId, size :: Int32, digest :: ByteString, sndKeyBs :: ByteString, path :: Maybe String, createdAt, status) ->
|
||||
(0 :: Int)
|
||||
( \(!fc) (sId, size :: Int32, digest :: ByteString, sndKeyBs :: ByteString, path :: Maybe String, createdAt, status) ->
|
||||
case C.decodePubKey sndKeyBs of
|
||||
Right sndKey -> do
|
||||
let fileInfo = FileInfo {sndKey, size = fromIntegral size, digest}
|
||||
logAddFile sl sId fileInfo createdAt status
|
||||
let rcps = M.findWithDefault [] sId rcpMap
|
||||
rc' = rc + length rcps
|
||||
forM_ (L.nonEmpty rcps) $ logAddRecipients sl sId
|
||||
forM_ path $ logPutFile sl sId
|
||||
pure (fc + 1, rc')
|
||||
pure (fc + 1)
|
||||
Left _ -> do
|
||||
putStrLn $ "WARNING: invalid sender key for " <> show sId
|
||||
pure (fc, rc)
|
||||
pure fc
|
||||
)
|
||||
-- Fold 2: stream recipients ordered by sender_id, flush FADD on sender change
|
||||
putStrLn "Exporting recipients..."
|
||||
!rCnt <- withTransaction (dbStore pgStore) $ \db ->
|
||||
DB.fold_
|
||||
db
|
||||
"SELECT sender_id, recipient_id, recipient_key FROM recipients ORDER BY sender_id"
|
||||
(Nothing :: Maybe SenderId, [] :: [FileRecipient], 0 :: Int)
|
||||
( \(!prevSId, !buf, !rc) (sId, rId, rKeyBs :: ByteString) ->
|
||||
case C.decodePubKey rKeyBs of
|
||||
Right rKey -> do
|
||||
let rcp = FileRecipient rId rKey
|
||||
case prevSId of
|
||||
Just prev | prev /= sId -> do
|
||||
forM_ (L.nonEmpty buf) $ logAddRecipients sl prev
|
||||
pure (Just sId, [rcp], rc + length buf)
|
||||
_ -> pure (Just sId, rcp : buf, rc)
|
||||
Left _ -> putStrLn ("WARNING: invalid recipient key for " <> show rId) $> (prevSId, buf, rc)
|
||||
)
|
||||
>>= \(lastSId, buf, rc) -> do
|
||||
forM_ lastSId $ \sId -> forM_ (L.nonEmpty buf) $ logAddRecipients sl sId
|
||||
pure (rc + length buf)
|
||||
closeStoreLog sl
|
||||
closeFileStore pgStore
|
||||
putStrLn $ "Exported " <> show fCnt <> " files, " <> show rCnt <> " recipients to " <> storeLogFilePath
|
||||
|
||||
Reference in New Issue
Block a user