mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-05-11 12:34:46 +00:00
feat: add database import/export CLI commands
This commit is contained in:
@@ -20,6 +20,8 @@ module Simplex.FileTransfer.Server.Env
|
||||
newXFTPServerEnv,
|
||||
runWithStoreConfig,
|
||||
checkFileStoreMode,
|
||||
importToDatabase,
|
||||
exportFromDatabase,
|
||||
) where
|
||||
|
||||
import Control.Logger.Simple
|
||||
@@ -40,7 +42,7 @@ import Simplex.FileTransfer.Server.Store.STM (STMFileStore (..))
|
||||
import Simplex.Messaging.Agent.Store.Shared (MigrationConfirmation)
|
||||
#if defined(dbServerPostgres)
|
||||
import Data.Functor (($>))
|
||||
import Simplex.FileTransfer.Server.Store.Postgres (PostgresFileStore)
|
||||
import Simplex.FileTransfer.Server.Store.Postgres (PostgresFileStore, importFileStore, exportFileStore)
|
||||
import Simplex.FileTransfer.Server.Store.Postgres.Config (PostgresFileStoreCfg (..), defaultXFTPDBOpts)
|
||||
import Simplex.Messaging.Server.CLI (iniDBOptions, settingIsOn)
|
||||
import System.Directory (doesFileExist)
|
||||
@@ -200,3 +202,23 @@ checkFileStoreMode ini storeType storeLogFilePath = case storeType of
|
||||
#else
|
||||
checkFileStoreMode _ _ _ = pure ()
|
||||
#endif
|
||||
|
||||
-- | Import StoreLog to PostgreSQL database.
|
||||
importToDatabase :: FilePath -> Ini -> MigrationConfirmation -> IO ()
|
||||
#if defined(dbServerPostgres)
|
||||
importToDatabase storeLogFilePath ini _confirmMigrations = do
|
||||
let dbCfg = PostgresFileStoreCfg {dbOpts = iniDBOptions ini defaultXFTPDBOpts, dbStoreLogPath = Nothing, confirmMigrations = _confirmMigrations}
|
||||
importFileStore storeLogFilePath dbCfg
|
||||
#else
|
||||
importToDatabase _ _ _ = error "Error: server binary is compiled without support for PostgreSQL database.\nPlease re-compile with `cabal build -fserver_postgres`."
|
||||
#endif
|
||||
|
||||
-- | Export PostgreSQL database to StoreLog.
|
||||
exportFromDatabase :: FilePath -> Ini -> MigrationConfirmation -> IO ()
|
||||
#if defined(dbServerPostgres)
|
||||
exportFromDatabase storeLogFilePath ini _confirmMigrations = do
|
||||
let dbCfg = PostgresFileStoreCfg {dbOpts = iniDBOptions ini defaultXFTPDBOpts, dbStoreLogPath = Nothing, confirmMigrations = _confirmMigrations}
|
||||
exportFileStore storeLogFilePath dbCfg
|
||||
#else
|
||||
exportFromDatabase _ _ _ = error "Error: server binary is compiled without support for PostgreSQL database.\nPlease re-compile with `cabal build -fserver_postgres`."
|
||||
#endif
|
||||
|
||||
@@ -12,7 +12,7 @@ module Simplex.FileTransfer.Server.Main
|
||||
xftpServerCLI_,
|
||||
) where
|
||||
|
||||
import Control.Monad (when)
|
||||
import Control.Monad (unless, when)
|
||||
import Data.Either (fromRight)
|
||||
import Data.Functor (($>))
|
||||
import Data.Ini (lookupValue, readIniFile)
|
||||
@@ -28,7 +28,7 @@ import Options.Applicative
|
||||
import Simplex.FileTransfer.Chunks
|
||||
import Simplex.FileTransfer.Description (FileSize (..))
|
||||
import Simplex.FileTransfer.Server (runXFTPServer)
|
||||
import Simplex.FileTransfer.Server.Env (XFTPServerConfig (..), defFileExpirationHours, defaultFileExpiration, defaultInactiveClientExpiration, runWithStoreConfig, checkFileStoreMode)
|
||||
import Simplex.FileTransfer.Server.Env (XFTPServerConfig (..), defFileExpirationHours, defaultFileExpiration, defaultInactiveClientExpiration, runWithStoreConfig, checkFileStoreMode, importToDatabase, exportFromDatabase)
|
||||
import Simplex.FileTransfer.Transport (alpnSupportedXFTPhandshakes, supportedFileServerVRange)
|
||||
import qualified Simplex.Messaging.Crypto as C
|
||||
import Simplex.Messaging.Encoding.String
|
||||
@@ -71,6 +71,10 @@ xftpServerCLI_ generateSite serveStaticFiles cfgPath logPath = do
|
||||
doesFileExist iniFile >>= \case
|
||||
True -> readIniFile iniFile >>= either exitError (runServer opts)
|
||||
_ -> exitError $ "Error: server is not initialized (" <> iniFile <> " does not exist).\nRun `" <> executableName <> " init`."
|
||||
Database cmd ->
|
||||
doesFileExist iniFile >>= \case
|
||||
True -> readIniFile iniFile >>= either exitError (runDatabaseCmd cmd)
|
||||
_ -> exitError $ "Error: server is not initialized (" <> iniFile <> " does not exist).\nRun `" <> executableName <> " init`."
|
||||
Delete -> do
|
||||
confirmOrExit
|
||||
"WARNING: deleting the server will make all queues inaccessible, because the server identity (certificate fingerprint) will change.\nTHIS CANNOT BE UNDONE!"
|
||||
@@ -85,6 +89,21 @@ xftpServerCLI_ generateSite serveStaticFiles cfgPath logPath = do
|
||||
executableName = "file-server"
|
||||
storeLogFilePath = combine logPath "file-server-store.log"
|
||||
defaultStaticPath = combine logPath "www"
|
||||
runDatabaseCmd cmd ini = case cmd of
|
||||
SCImport -> do
|
||||
storeLogExists <- doesFileExist storeLogFilePath
|
||||
unless storeLogExists $ exitError $ "Error: store log file " <> storeLogFilePath <> " does not exist."
|
||||
confirmOrExit
|
||||
("Import store log " <> storeLogFilePath <> " to PostgreSQL database?")
|
||||
"Import cancelled."
|
||||
importToDatabase storeLogFilePath ini MCYesUp
|
||||
SCExport -> do
|
||||
storeLogExists <- doesFileExist storeLogFilePath
|
||||
when storeLogExists $ exitError $ "Error: store log file " <> storeLogFilePath <> " already exists."
|
||||
confirmOrExit
|
||||
("Export PostgreSQL database to store log " <> storeLogFilePath <> "?")
|
||||
"Export cancelled."
|
||||
exportFromDatabase storeLogFilePath ini MCConsole
|
||||
initializeServer InitOptions {enableStoreLog, signAlgorithm, ip, fqdn, filesPath, fileSizeQuota, webStaticPath = webStaticPath_} = do
|
||||
clearDirIfExists cfgPath
|
||||
clearDirIfExists logPath
|
||||
@@ -302,8 +321,11 @@ data CliCommand
|
||||
= Init InitOptions
|
||||
| OnlineCert CertOptions
|
||||
| Start StartOptions
|
||||
| Database StoreCmd
|
||||
| Delete
|
||||
|
||||
data StoreCmd = SCImport | SCExport
|
||||
|
||||
newtype StartOptions = StartOptions
|
||||
{ confirmMigrations :: MigrationConfirmation
|
||||
}
|
||||
@@ -325,6 +347,7 @@ cliCommandP cfgPath logPath iniFile =
|
||||
( command "init" (info (Init <$> initP) (progDesc $ "Initialize server - creates " <> cfgPath <> " and " <> logPath <> " directories and configuration files"))
|
||||
<> command "cert" (info (OnlineCert <$> certOptionsP) (progDesc $ "Generate new online TLS server credentials (configuration: " <> iniFile <> ")"))
|
||||
<> command "start" (info (Start <$> startOptsP) (progDesc $ "Start server (configuration: " <> iniFile <> ")"))
|
||||
<> command "database" (info (Database <$> storeCmdP) (progDesc "Import/export file store to/from PostgreSQL database"))
|
||||
<> command "delete" (info (pure Delete) (progDesc "Delete configuration and log files"))
|
||||
)
|
||||
where
|
||||
@@ -408,3 +431,9 @@ cliCommandP cfgPath logPath iniFile =
|
||||
"up" -> Right MCYesUp
|
||||
"down" -> Right MCYesUpDown
|
||||
_ -> Left "invalid migration confirmation, pass 'up' or 'down'"
|
||||
storeCmdP :: Parser StoreCmd
|
||||
storeCmdP =
|
||||
hsubparser
|
||||
( command "import" (info (pure SCImport) (progDesc "Import store log file into PostgreSQL database"))
|
||||
<> command "export" (info (pure SCExport) (progDesc "Export PostgreSQL database to store log file"))
|
||||
)
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
{-# LANGUAGE BangPatterns #-}
|
||||
{-# LANGUAGE DataKinds #-}
|
||||
{-# LANGUAGE GADTs #-}
|
||||
{-# LANGUAGE LambdaCase #-}
|
||||
@@ -13,6 +14,8 @@ module Simplex.FileTransfer.Server.Store.Postgres
|
||||
handleDuplicate,
|
||||
assertUpdated,
|
||||
withLog,
|
||||
importFileStore,
|
||||
exportFileStore,
|
||||
)
|
||||
where
|
||||
|
||||
@@ -22,32 +25,45 @@ import Control.Monad
|
||||
import Control.Monad.Except
|
||||
import Control.Monad.IO.Class
|
||||
import Control.Monad.Trans.Except (throwE)
|
||||
import Data.ByteString (ByteString)
|
||||
import Data.ByteString.Builder (Builder)
|
||||
import qualified Data.ByteString.Builder as BB
|
||||
import qualified Data.ByteString.Lazy as LB
|
||||
import Data.Functor (($>))
|
||||
import Data.Int (Int32, Int64)
|
||||
import Data.List (intersperse)
|
||||
import qualified Data.List.NonEmpty as L
|
||||
import qualified Data.Map.Strict as M
|
||||
import qualified Data.Set as S
|
||||
import Data.Text (Text)
|
||||
import Data.Word (Word32)
|
||||
import Database.PostgreSQL.Simple (Binary (..), Only (..), SqlError)
|
||||
import Database.PostgreSQL.Simple.Errors (ConstraintViolation (..), constraintViolation)
|
||||
import qualified Database.PostgreSQL.Simple as DB
|
||||
import qualified Database.PostgreSQL.Simple.Copy as DB
|
||||
import Database.PostgreSQL.Simple.Errors (ConstraintViolation (..), constraintViolation)
|
||||
import Database.PostgreSQL.Simple.ToField (Action (..), ToField (..))
|
||||
import GHC.IO (catchAny)
|
||||
import Simplex.FileTransfer.Protocol (FileInfo (..), SFileParty (..))
|
||||
import Simplex.FileTransfer.Server.Store
|
||||
import Simplex.FileTransfer.Server.Store.Postgres.Config
|
||||
import Simplex.FileTransfer.Server.Store.Postgres.Migrations (xftpServerMigrations)
|
||||
import Simplex.FileTransfer.Server.Store.STM (STMFileStore (..))
|
||||
import Simplex.FileTransfer.Server.StoreLog
|
||||
import Simplex.FileTransfer.Transport (XFTPErrorType (..))
|
||||
import Simplex.Messaging.Agent.Store.Postgres (closeDBStore, createDBStore)
|
||||
import Simplex.Messaging.Agent.Store.Postgres.Common (DBStore, withTransaction)
|
||||
import Simplex.Messaging.Agent.Store.Shared (MigrationConfig (..))
|
||||
import Simplex.Messaging.Agent.Store.Postgres.Options (DBOpts (..))
|
||||
import Simplex.Messaging.Agent.Store.Shared (MigrationConfig (..), MigrationConfirmation (..))
|
||||
import qualified Simplex.Messaging.Crypto as C
|
||||
import Simplex.Messaging.Protocol (SenderId)
|
||||
import Simplex.Messaging.Protocol (RcvPublicAuthKey, RecipientId, SenderId)
|
||||
import Simplex.Messaging.Transport (EntityId (..))
|
||||
import Simplex.Messaging.Server.QueueStore (ServerEntityStatus (..))
|
||||
import Simplex.Messaging.Server.QueueStore.Postgres ()
|
||||
import Simplex.Messaging.Server.StoreLog (openWriteStoreLog)
|
||||
import Simplex.Messaging.Util (tshow)
|
||||
import System.Directory (renameFile)
|
||||
import System.Exit (exitFailure)
|
||||
import System.IO (IOMode (..))
|
||||
import System.IO (IOMode (..), hFlush, stdout)
|
||||
import UnliftIO.STM
|
||||
|
||||
data PostgresFileStore = PostgresFileStore
|
||||
@@ -207,3 +223,133 @@ withLog :: MonadIO m => Text -> PostgresFileStore -> (StoreLog 'WriteMode -> IO
|
||||
withLog op PostgresFileStore {dbStoreLog} action =
|
||||
forM_ dbStoreLog $ \sl -> liftIO $ action sl `catchAny` \e ->
|
||||
logWarn $ "STORE: " <> op <> ", withLog, " <> tshow e
|
||||
|
||||
-- Import: StoreLog -> PostgreSQL
|
||||
|
||||
importFileStore :: FilePath -> PostgresFileStoreCfg -> IO ()
|
||||
importFileStore storeLogFilePath dbCfg = do
|
||||
putStrLn $ "Reading store log: " <> storeLogFilePath
|
||||
stmStore <- newFileStore () :: IO STMFileStore
|
||||
sl <- readWriteFileStore storeLogFilePath stmStore
|
||||
closeStoreLog sl
|
||||
allFiles <- readTVarIO (files stmStore)
|
||||
allRcps <- readTVarIO (recipients stmStore)
|
||||
let fileCount = M.size allFiles
|
||||
rcpCount = M.size allRcps
|
||||
putStrLn $ "Loaded " <> show fileCount <> " files, " <> show rcpCount <> " recipients."
|
||||
let dbCfg' = dbCfg {dbOpts = (dbOpts dbCfg) {createSchema = True}, confirmMigrations = MCYesUp}
|
||||
pgStore <- newFileStore dbCfg' :: IO PostgresFileStore
|
||||
putStrLn "Importing files..."
|
||||
fCnt <- withTransaction (dbStore pgStore) $ \db -> do
|
||||
DB.copy_
|
||||
db
|
||||
"COPY files (sender_id, file_size, file_digest, sender_key, file_path, created_at, status) FROM STDIN WITH (FORMAT csv)"
|
||||
iforM_ (M.toList allFiles) $ \i (sId, fr) -> do
|
||||
DB.putCopyData db =<< fileRecToCSV sId fr
|
||||
when (i > 0 && i `mod` 10000 == 0) $ putStr (" " <> show i <> " files\r") >> hFlush stdout
|
||||
DB.putCopyEnd db
|
||||
[Only cnt] <- DB.query_ db "SELECT COUNT(*) FROM files"
|
||||
pure (cnt :: Int64)
|
||||
putStrLn $ "Imported " <> show fCnt <> " files."
|
||||
putStrLn "Importing recipients..."
|
||||
rCnt <- withTransaction (dbStore pgStore) $ \db -> do
|
||||
DB.copy_
|
||||
db
|
||||
"COPY recipients (recipient_id, sender_id, recipient_key) FROM STDIN WITH (FORMAT csv)"
|
||||
iforM_ (M.toList allRcps) $ \i (rId, (sId, rKey)) -> do
|
||||
DB.putCopyData db $ recipientToCSV rId sId rKey
|
||||
when (i > 0 && i `mod` 10000 == 0) $ putStr (" " <> show i <> " recipients\r") >> hFlush stdout
|
||||
DB.putCopyEnd db
|
||||
[Only cnt] <- DB.query_ db "SELECT COUNT(*) FROM recipients"
|
||||
pure (cnt :: Int64)
|
||||
putStrLn $ "Imported " <> show rCnt <> " recipients."
|
||||
when (fromIntegral fileCount /= fCnt) $
|
||||
putStrLn $ "WARNING: expected " <> show fileCount <> " files, got " <> show fCnt
|
||||
when (fromIntegral rcpCount /= rCnt) $
|
||||
putStrLn $ "WARNING: expected " <> show rcpCount <> " recipients, got " <> show rCnt
|
||||
closeFileStore pgStore
|
||||
renameFile storeLogFilePath (storeLogFilePath <> ".bak")
|
||||
putStrLn $ "Store log renamed to " <> storeLogFilePath <> ".bak"
|
||||
|
||||
-- Export: PostgreSQL -> StoreLog
|
||||
|
||||
exportFileStore :: FilePath -> PostgresFileStoreCfg -> IO ()
|
||||
exportFileStore storeLogFilePath dbCfg = do
|
||||
pgStore <- newFileStore dbCfg :: IO PostgresFileStore
|
||||
sl <- openWriteStoreLog False storeLogFilePath
|
||||
putStrLn "Exporting files..."
|
||||
-- Load all recipients into a map for lookup
|
||||
rcpMap <- withTransaction (dbStore pgStore) $ \db ->
|
||||
DB.fold_
|
||||
db
|
||||
"SELECT recipient_id, sender_id, recipient_key FROM recipients ORDER BY sender_id"
|
||||
M.empty
|
||||
(\acc (rId, sId, rKeyBs :: ByteString) ->
|
||||
case C.decodePubKey rKeyBs of
|
||||
Right rKey -> pure $! M.insertWith (++) sId [FileRecipient rId rKey] acc
|
||||
Left _ -> putStrLn ("WARNING: invalid recipient key for " <> show rId) $> acc)
|
||||
-- Fold over files, writing StoreLog records
|
||||
(!fCnt, !rCnt) <- withTransaction (dbStore pgStore) $ \db ->
|
||||
DB.fold_
|
||||
db
|
||||
"SELECT sender_id, file_size, file_digest, sender_key, file_path, created_at, status FROM files ORDER BY created_at"
|
||||
(0 :: Int, 0 :: Int)
|
||||
( \(!fc, !rc) (sId, size :: Int32, digest :: ByteString, sndKeyBs :: ByteString, path :: Maybe String, createdAt, status) ->
|
||||
case C.decodePubKey sndKeyBs of
|
||||
Right sndKey -> do
|
||||
let fileInfo = FileInfo {sndKey, size = fromIntegral size, digest}
|
||||
logAddFile sl sId fileInfo createdAt status
|
||||
let rcps = M.findWithDefault [] sId rcpMap
|
||||
rc' = rc + length rcps
|
||||
forM_ (L.nonEmpty rcps) $ logAddRecipients sl sId
|
||||
forM_ path $ logPutFile sl sId
|
||||
pure (fc + 1, rc')
|
||||
Left _ -> do
|
||||
putStrLn $ "WARNING: invalid sender key for " <> show sId
|
||||
pure (fc, rc)
|
||||
)
|
||||
closeStoreLog sl
|
||||
closeFileStore pgStore
|
||||
putStrLn $ "Exported " <> show fCnt <> " files, " <> show rCnt <> " recipients to " <> storeLogFilePath
|
||||
|
||||
-- CSV helpers for COPY protocol
|
||||
|
||||
iforM_ :: Monad m => [a] -> (Int -> a -> m ()) -> m ()
|
||||
iforM_ xs f = zipWithM_ f [0 ..] xs
|
||||
|
||||
fileRecToCSV :: SenderId -> FileRec -> IO ByteString
|
||||
fileRecToCSV sId FileRec {fileInfo = FileInfo {sndKey, size, digest}, filePath, createdAt, fileStatus} = do
|
||||
path <- readTVarIO filePath
|
||||
status <- readTVarIO fileStatus
|
||||
pure $ LB.toStrict $ BB.toLazyByteString $ mconcat (BB.char7 ',' `intersperse` fields path status) <> BB.char7 '\n'
|
||||
where
|
||||
fields path status =
|
||||
[ renderField (toField (Binary (unEntityId sId))),
|
||||
renderField (toField (fromIntegral size :: Int32)),
|
||||
renderField (toField (Binary digest)),
|
||||
renderField (toField (Binary (C.encodePubKey sndKey))),
|
||||
nullable (toField <$> path),
|
||||
renderField (toField createdAt),
|
||||
BB.char7 '"' <> renderField (toField status) <> BB.char7 '"'
|
||||
]
|
||||
|
||||
recipientToCSV :: RecipientId -> SenderId -> RcvPublicAuthKey -> ByteString
|
||||
recipientToCSV rId sId rKey =
|
||||
LB.toStrict $ BB.toLazyByteString $ mconcat (BB.char7 ',' `intersperse` fields) <> BB.char7 '\n'
|
||||
where
|
||||
fields =
|
||||
[ renderField (toField (Binary (unEntityId rId))),
|
||||
renderField (toField (Binary (unEntityId sId))),
|
||||
renderField (toField (Binary (C.encodePubKey rKey)))
|
||||
]
|
||||
|
||||
renderField :: Action -> Builder
|
||||
renderField = \case
|
||||
Plain bld -> bld
|
||||
Escape s -> BB.byteString s
|
||||
EscapeByteA s -> BB.string7 "\\x" <> BB.byteStringHex s
|
||||
EscapeIdentifier s -> BB.byteString s
|
||||
Many as -> mconcat (map renderField as)
|
||||
|
||||
nullable :: Maybe Action -> Builder
|
||||
nullable = maybe mempty renderField
|
||||
|
||||
Reference in New Issue
Block a user