From aacd873dff460118f45db4eced981baea88f4f7f Mon Sep 17 00:00:00 2001 From: shum Date: Wed, 1 Apr 2026 14:54:40 +0000 Subject: [PATCH] feat: add database import/export CLI commands --- src/Simplex/FileTransfer/Server/Env.hs | 24 ++- src/Simplex/FileTransfer/Server/Main.hs | 33 +++- .../FileTransfer/Server/Store/Postgres.hs | 154 +++++++++++++++++- 3 files changed, 204 insertions(+), 7 deletions(-) diff --git a/src/Simplex/FileTransfer/Server/Env.hs b/src/Simplex/FileTransfer/Server/Env.hs index e5289ecb6..73773ff88 100644 --- a/src/Simplex/FileTransfer/Server/Env.hs +++ b/src/Simplex/FileTransfer/Server/Env.hs @@ -20,6 +20,8 @@ module Simplex.FileTransfer.Server.Env newXFTPServerEnv, runWithStoreConfig, checkFileStoreMode, + importToDatabase, + exportFromDatabase, ) where import Control.Logger.Simple @@ -40,7 +42,7 @@ import Simplex.FileTransfer.Server.Store.STM (STMFileStore (..)) import Simplex.Messaging.Agent.Store.Shared (MigrationConfirmation) #if defined(dbServerPostgres) import Data.Functor (($>)) -import Simplex.FileTransfer.Server.Store.Postgres (PostgresFileStore) +import Simplex.FileTransfer.Server.Store.Postgres (PostgresFileStore, importFileStore, exportFileStore) import Simplex.FileTransfer.Server.Store.Postgres.Config (PostgresFileStoreCfg (..), defaultXFTPDBOpts) import Simplex.Messaging.Server.CLI (iniDBOptions, settingIsOn) import System.Directory (doesFileExist) @@ -200,3 +202,23 @@ checkFileStoreMode ini storeType storeLogFilePath = case storeType of #else checkFileStoreMode _ _ _ = pure () #endif + +-- | Import StoreLog to PostgreSQL database. +importToDatabase :: FilePath -> Ini -> MigrationConfirmation -> IO () +#if defined(dbServerPostgres) +importToDatabase storeLogFilePath ini _confirmMigrations = do + let dbCfg = PostgresFileStoreCfg {dbOpts = iniDBOptions ini defaultXFTPDBOpts, dbStoreLogPath = Nothing, confirmMigrations = _confirmMigrations} + importFileStore storeLogFilePath dbCfg +#else +importToDatabase _ _ _ = error "Error: server binary is compiled without support for PostgreSQL database.\nPlease re-compile with `cabal build -fserver_postgres`." +#endif + +-- | Export PostgreSQL database to StoreLog. +exportFromDatabase :: FilePath -> Ini -> MigrationConfirmation -> IO () +#if defined(dbServerPostgres) +exportFromDatabase storeLogFilePath ini _confirmMigrations = do + let dbCfg = PostgresFileStoreCfg {dbOpts = iniDBOptions ini defaultXFTPDBOpts, dbStoreLogPath = Nothing, confirmMigrations = _confirmMigrations} + exportFileStore storeLogFilePath dbCfg +#else +exportFromDatabase _ _ _ = error "Error: server binary is compiled without support for PostgreSQL database.\nPlease re-compile with `cabal build -fserver_postgres`." +#endif diff --git a/src/Simplex/FileTransfer/Server/Main.hs b/src/Simplex/FileTransfer/Server/Main.hs index f39825aa3..9f5045300 100644 --- a/src/Simplex/FileTransfer/Server/Main.hs +++ b/src/Simplex/FileTransfer/Server/Main.hs @@ -12,7 +12,7 @@ module Simplex.FileTransfer.Server.Main xftpServerCLI_, ) where -import Control.Monad (when) +import Control.Monad (unless, when) import Data.Either (fromRight) import Data.Functor (($>)) import Data.Ini (lookupValue, readIniFile) @@ -28,7 +28,7 @@ import Options.Applicative import Simplex.FileTransfer.Chunks import Simplex.FileTransfer.Description (FileSize (..)) import Simplex.FileTransfer.Server (runXFTPServer) -import Simplex.FileTransfer.Server.Env (XFTPServerConfig (..), defFileExpirationHours, defaultFileExpiration, defaultInactiveClientExpiration, runWithStoreConfig, checkFileStoreMode) +import Simplex.FileTransfer.Server.Env (XFTPServerConfig (..), defFileExpirationHours, defaultFileExpiration, defaultInactiveClientExpiration, runWithStoreConfig, checkFileStoreMode, importToDatabase, exportFromDatabase) import Simplex.FileTransfer.Transport (alpnSupportedXFTPhandshakes, supportedFileServerVRange) import qualified Simplex.Messaging.Crypto as C import Simplex.Messaging.Encoding.String @@ -71,6 +71,10 @@ xftpServerCLI_ generateSite serveStaticFiles cfgPath logPath = do doesFileExist iniFile >>= \case True -> readIniFile iniFile >>= either exitError (runServer opts) _ -> exitError $ "Error: server is not initialized (" <> iniFile <> " does not exist).\nRun `" <> executableName <> " init`." + Database cmd -> + doesFileExist iniFile >>= \case + True -> readIniFile iniFile >>= either exitError (runDatabaseCmd cmd) + _ -> exitError $ "Error: server is not initialized (" <> iniFile <> " does not exist).\nRun `" <> executableName <> " init`." Delete -> do confirmOrExit "WARNING: deleting the server will make all queues inaccessible, because the server identity (certificate fingerprint) will change.\nTHIS CANNOT BE UNDONE!" @@ -85,6 +89,21 @@ xftpServerCLI_ generateSite serveStaticFiles cfgPath logPath = do executableName = "file-server" storeLogFilePath = combine logPath "file-server-store.log" defaultStaticPath = combine logPath "www" + runDatabaseCmd cmd ini = case cmd of + SCImport -> do + storeLogExists <- doesFileExist storeLogFilePath + unless storeLogExists $ exitError $ "Error: store log file " <> storeLogFilePath <> " does not exist." + confirmOrExit + ("Import store log " <> storeLogFilePath <> " to PostgreSQL database?") + "Import cancelled." + importToDatabase storeLogFilePath ini MCYesUp + SCExport -> do + storeLogExists <- doesFileExist storeLogFilePath + when storeLogExists $ exitError $ "Error: store log file " <> storeLogFilePath <> " already exists." + confirmOrExit + ("Export PostgreSQL database to store log " <> storeLogFilePath <> "?") + "Export cancelled." + exportFromDatabase storeLogFilePath ini MCConsole initializeServer InitOptions {enableStoreLog, signAlgorithm, ip, fqdn, filesPath, fileSizeQuota, webStaticPath = webStaticPath_} = do clearDirIfExists cfgPath clearDirIfExists logPath @@ -302,8 +321,11 @@ data CliCommand = Init InitOptions | OnlineCert CertOptions | Start StartOptions + | Database StoreCmd | Delete +data StoreCmd = SCImport | SCExport + newtype StartOptions = StartOptions { confirmMigrations :: MigrationConfirmation } @@ -325,6 +347,7 @@ cliCommandP cfgPath logPath iniFile = ( command "init" (info (Init <$> initP) (progDesc $ "Initialize server - creates " <> cfgPath <> " and " <> logPath <> " directories and configuration files")) <> command "cert" (info (OnlineCert <$> certOptionsP) (progDesc $ "Generate new online TLS server credentials (configuration: " <> iniFile <> ")")) <> command "start" (info (Start <$> startOptsP) (progDesc $ "Start server (configuration: " <> iniFile <> ")")) + <> command "database" (info (Database <$> storeCmdP) (progDesc "Import/export file store to/from PostgreSQL database")) <> command "delete" (info (pure Delete) (progDesc "Delete configuration and log files")) ) where @@ -408,3 +431,9 @@ cliCommandP cfgPath logPath iniFile = "up" -> Right MCYesUp "down" -> Right MCYesUpDown _ -> Left "invalid migration confirmation, pass 'up' or 'down'" + storeCmdP :: Parser StoreCmd + storeCmdP = + hsubparser + ( command "import" (info (pure SCImport) (progDesc "Import store log file into PostgreSQL database")) + <> command "export" (info (pure SCExport) (progDesc "Export PostgreSQL database to store log file")) + ) diff --git a/src/Simplex/FileTransfer/Server/Store/Postgres.hs b/src/Simplex/FileTransfer/Server/Store/Postgres.hs index fea00fbc9..08ca4ce98 100644 --- a/src/Simplex/FileTransfer/Server/Store/Postgres.hs +++ b/src/Simplex/FileTransfer/Server/Store/Postgres.hs @@ -1,3 +1,4 @@ +{-# LANGUAGE BangPatterns #-} {-# LANGUAGE DataKinds #-} {-# LANGUAGE GADTs #-} {-# LANGUAGE LambdaCase #-} @@ -13,6 +14,8 @@ module Simplex.FileTransfer.Server.Store.Postgres handleDuplicate, assertUpdated, withLog, + importFileStore, + exportFileStore, ) where @@ -22,32 +25,45 @@ import Control.Monad import Control.Monad.Except import Control.Monad.IO.Class import Control.Monad.Trans.Except (throwE) +import Data.ByteString (ByteString) +import Data.ByteString.Builder (Builder) +import qualified Data.ByteString.Builder as BB +import qualified Data.ByteString.Lazy as LB import Data.Functor (($>)) import Data.Int (Int32, Int64) +import Data.List (intersperse) +import qualified Data.List.NonEmpty as L +import qualified Data.Map.Strict as M import qualified Data.Set as S import Data.Text (Text) import Data.Word (Word32) import Database.PostgreSQL.Simple (Binary (..), Only (..), SqlError) -import Database.PostgreSQL.Simple.Errors (ConstraintViolation (..), constraintViolation) import qualified Database.PostgreSQL.Simple as DB +import qualified Database.PostgreSQL.Simple.Copy as DB +import Database.PostgreSQL.Simple.Errors (ConstraintViolation (..), constraintViolation) +import Database.PostgreSQL.Simple.ToField (Action (..), ToField (..)) import GHC.IO (catchAny) import Simplex.FileTransfer.Protocol (FileInfo (..), SFileParty (..)) import Simplex.FileTransfer.Server.Store import Simplex.FileTransfer.Server.Store.Postgres.Config import Simplex.FileTransfer.Server.Store.Postgres.Migrations (xftpServerMigrations) +import Simplex.FileTransfer.Server.Store.STM (STMFileStore (..)) import Simplex.FileTransfer.Server.StoreLog import Simplex.FileTransfer.Transport (XFTPErrorType (..)) import Simplex.Messaging.Agent.Store.Postgres (closeDBStore, createDBStore) import Simplex.Messaging.Agent.Store.Postgres.Common (DBStore, withTransaction) -import Simplex.Messaging.Agent.Store.Shared (MigrationConfig (..)) +import Simplex.Messaging.Agent.Store.Postgres.Options (DBOpts (..)) +import Simplex.Messaging.Agent.Store.Shared (MigrationConfig (..), MigrationConfirmation (..)) import qualified Simplex.Messaging.Crypto as C -import Simplex.Messaging.Protocol (SenderId) +import Simplex.Messaging.Protocol (RcvPublicAuthKey, RecipientId, SenderId) +import Simplex.Messaging.Transport (EntityId (..)) import Simplex.Messaging.Server.QueueStore (ServerEntityStatus (..)) import Simplex.Messaging.Server.QueueStore.Postgres () import Simplex.Messaging.Server.StoreLog (openWriteStoreLog) import Simplex.Messaging.Util (tshow) +import System.Directory (renameFile) import System.Exit (exitFailure) -import System.IO (IOMode (..)) +import System.IO (IOMode (..), hFlush, stdout) import UnliftIO.STM data PostgresFileStore = PostgresFileStore @@ -207,3 +223,133 @@ withLog :: MonadIO m => Text -> PostgresFileStore -> (StoreLog 'WriteMode -> IO withLog op PostgresFileStore {dbStoreLog} action = forM_ dbStoreLog $ \sl -> liftIO $ action sl `catchAny` \e -> logWarn $ "STORE: " <> op <> ", withLog, " <> tshow e + +-- Import: StoreLog -> PostgreSQL + +importFileStore :: FilePath -> PostgresFileStoreCfg -> IO () +importFileStore storeLogFilePath dbCfg = do + putStrLn $ "Reading store log: " <> storeLogFilePath + stmStore <- newFileStore () :: IO STMFileStore + sl <- readWriteFileStore storeLogFilePath stmStore + closeStoreLog sl + allFiles <- readTVarIO (files stmStore) + allRcps <- readTVarIO (recipients stmStore) + let fileCount = M.size allFiles + rcpCount = M.size allRcps + putStrLn $ "Loaded " <> show fileCount <> " files, " <> show rcpCount <> " recipients." + let dbCfg' = dbCfg {dbOpts = (dbOpts dbCfg) {createSchema = True}, confirmMigrations = MCYesUp} + pgStore <- newFileStore dbCfg' :: IO PostgresFileStore + putStrLn "Importing files..." + fCnt <- withTransaction (dbStore pgStore) $ \db -> do + DB.copy_ + db + "COPY files (sender_id, file_size, file_digest, sender_key, file_path, created_at, status) FROM STDIN WITH (FORMAT csv)" + iforM_ (M.toList allFiles) $ \i (sId, fr) -> do + DB.putCopyData db =<< fileRecToCSV sId fr + when (i > 0 && i `mod` 10000 == 0) $ putStr (" " <> show i <> " files\r") >> hFlush stdout + DB.putCopyEnd db + [Only cnt] <- DB.query_ db "SELECT COUNT(*) FROM files" + pure (cnt :: Int64) + putStrLn $ "Imported " <> show fCnt <> " files." + putStrLn "Importing recipients..." + rCnt <- withTransaction (dbStore pgStore) $ \db -> do + DB.copy_ + db + "COPY recipients (recipient_id, sender_id, recipient_key) FROM STDIN WITH (FORMAT csv)" + iforM_ (M.toList allRcps) $ \i (rId, (sId, rKey)) -> do + DB.putCopyData db $ recipientToCSV rId sId rKey + when (i > 0 && i `mod` 10000 == 0) $ putStr (" " <> show i <> " recipients\r") >> hFlush stdout + DB.putCopyEnd db + [Only cnt] <- DB.query_ db "SELECT COUNT(*) FROM recipients" + pure (cnt :: Int64) + putStrLn $ "Imported " <> show rCnt <> " recipients." + when (fromIntegral fileCount /= fCnt) $ + putStrLn $ "WARNING: expected " <> show fileCount <> " files, got " <> show fCnt + when (fromIntegral rcpCount /= rCnt) $ + putStrLn $ "WARNING: expected " <> show rcpCount <> " recipients, got " <> show rCnt + closeFileStore pgStore + renameFile storeLogFilePath (storeLogFilePath <> ".bak") + putStrLn $ "Store log renamed to " <> storeLogFilePath <> ".bak" + +-- Export: PostgreSQL -> StoreLog + +exportFileStore :: FilePath -> PostgresFileStoreCfg -> IO () +exportFileStore storeLogFilePath dbCfg = do + pgStore <- newFileStore dbCfg :: IO PostgresFileStore + sl <- openWriteStoreLog False storeLogFilePath + putStrLn "Exporting files..." + -- Load all recipients into a map for lookup + rcpMap <- withTransaction (dbStore pgStore) $ \db -> + DB.fold_ + db + "SELECT recipient_id, sender_id, recipient_key FROM recipients ORDER BY sender_id" + M.empty + (\acc (rId, sId, rKeyBs :: ByteString) -> + case C.decodePubKey rKeyBs of + Right rKey -> pure $! M.insertWith (++) sId [FileRecipient rId rKey] acc + Left _ -> putStrLn ("WARNING: invalid recipient key for " <> show rId) $> acc) + -- Fold over files, writing StoreLog records + (!fCnt, !rCnt) <- withTransaction (dbStore pgStore) $ \db -> + DB.fold_ + db + "SELECT sender_id, file_size, file_digest, sender_key, file_path, created_at, status FROM files ORDER BY created_at" + (0 :: Int, 0 :: Int) + ( \(!fc, !rc) (sId, size :: Int32, digest :: ByteString, sndKeyBs :: ByteString, path :: Maybe String, createdAt, status) -> + case C.decodePubKey sndKeyBs of + Right sndKey -> do + let fileInfo = FileInfo {sndKey, size = fromIntegral size, digest} + logAddFile sl sId fileInfo createdAt status + let rcps = M.findWithDefault [] sId rcpMap + rc' = rc + length rcps + forM_ (L.nonEmpty rcps) $ logAddRecipients sl sId + forM_ path $ logPutFile sl sId + pure (fc + 1, rc') + Left _ -> do + putStrLn $ "WARNING: invalid sender key for " <> show sId + pure (fc, rc) + ) + closeStoreLog sl + closeFileStore pgStore + putStrLn $ "Exported " <> show fCnt <> " files, " <> show rCnt <> " recipients to " <> storeLogFilePath + +-- CSV helpers for COPY protocol + +iforM_ :: Monad m => [a] -> (Int -> a -> m ()) -> m () +iforM_ xs f = zipWithM_ f [0 ..] xs + +fileRecToCSV :: SenderId -> FileRec -> IO ByteString +fileRecToCSV sId FileRec {fileInfo = FileInfo {sndKey, size, digest}, filePath, createdAt, fileStatus} = do + path <- readTVarIO filePath + status <- readTVarIO fileStatus + pure $ LB.toStrict $ BB.toLazyByteString $ mconcat (BB.char7 ',' `intersperse` fields path status) <> BB.char7 '\n' + where + fields path status = + [ renderField (toField (Binary (unEntityId sId))), + renderField (toField (fromIntegral size :: Int32)), + renderField (toField (Binary digest)), + renderField (toField (Binary (C.encodePubKey sndKey))), + nullable (toField <$> path), + renderField (toField createdAt), + BB.char7 '"' <> renderField (toField status) <> BB.char7 '"' + ] + +recipientToCSV :: RecipientId -> SenderId -> RcvPublicAuthKey -> ByteString +recipientToCSV rId sId rKey = + LB.toStrict $ BB.toLazyByteString $ mconcat (BB.char7 ',' `intersperse` fields) <> BB.char7 '\n' + where + fields = + [ renderField (toField (Binary (unEntityId rId))), + renderField (toField (Binary (unEntityId sId))), + renderField (toField (Binary (C.encodePubKey rKey))) + ] + +renderField :: Action -> Builder +renderField = \case + Plain bld -> bld + Escape s -> BB.byteString s + EscapeByteA s -> BB.string7 "\\x" <> BB.byteStringHex s + EscapeIdentifier s -> BB.byteString s + Many as -> mconcat (map renderField as) + +nullable :: Maybe Action -> Builder +nullable = maybe mempty renderField