mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-06-07 13:12:10 +00:00
feat: add PostgreSQL store skeleton with schema migration
This commit is contained in:
@@ -282,6 +282,9 @@ library
|
||||
Simplex.Messaging.Notifications.Server.Store.Postgres
|
||||
Simplex.Messaging.Notifications.Server.Store.Types
|
||||
Simplex.Messaging.Notifications.Server.StoreLog
|
||||
Simplex.FileTransfer.Server.Store.Postgres
|
||||
Simplex.FileTransfer.Server.Store.Postgres.Config
|
||||
Simplex.FileTransfer.Server.Store.Postgres.Migrations
|
||||
Simplex.Messaging.Server.MsgStore.Postgres
|
||||
Simplex.Messaging.Server.QueueStore.Postgres
|
||||
Simplex.Messaging.Server.QueueStore.Postgres.Migrations
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
{-# LANGUAGE CPP #-}
|
||||
{-# LANGUAGE DataKinds #-}
|
||||
{-# LANGUAGE DuplicateRecordFields #-}
|
||||
{-# LANGUAGE GADTs #-}
|
||||
@@ -32,6 +33,10 @@ import Simplex.FileTransfer.Protocol (FileCmd, FileInfo (..), XFTPFileId)
|
||||
import Simplex.FileTransfer.Server.Stats
|
||||
import Simplex.FileTransfer.Server.Store
|
||||
import Simplex.FileTransfer.Server.Store.STM (STMFileStore (..))
|
||||
#if defined(dbServerPostgres)
|
||||
import Simplex.FileTransfer.Server.Store.Postgres (PostgresFileStore)
|
||||
import Simplex.FileTransfer.Server.Store.Postgres.Config (PostgresFileStoreCfg)
|
||||
#endif
|
||||
import Simplex.FileTransfer.Server.StoreLog
|
||||
import Simplex.FileTransfer.Transport (VersionRangeXFTP)
|
||||
import qualified Simplex.Messaging.Crypto as C
|
||||
@@ -90,6 +95,9 @@ defaultInactiveClientExpiration =
|
||||
|
||||
data XFTPStoreConfig s where
|
||||
XSCMemory :: Maybe FilePath -> XFTPStoreConfig STMFileStore
|
||||
#if defined(dbServerPostgres)
|
||||
XSCDatabase :: PostgresFileStoreCfg -> XFTPStoreConfig PostgresFileStore
|
||||
#endif
|
||||
|
||||
data XFTPEnv s = XFTPEnv
|
||||
{ config :: XFTPServerConfig,
|
||||
@@ -121,6 +129,11 @@ newXFTPServerEnv storeCfg config@XFTPServerConfig {fileSizeQuota, xftpCredential
|
||||
st <- newFileStore ()
|
||||
sl <- mapM (`readWriteFileStore` st) storeLogPath
|
||||
pure (st, sl)
|
||||
#if defined(dbServerPostgres)
|
||||
XSCDatabase dbCfg -> do
|
||||
st <- newFileStore dbCfg
|
||||
pure (st, Nothing)
|
||||
#endif
|
||||
used <- getUsedStorage store
|
||||
usedStorage <- newTVarIO used
|
||||
forM_ fileSizeQuota $ \quota -> do
|
||||
|
||||
@@ -0,0 +1,104 @@
|
||||
{-# LANGUAGE DataKinds #-}
|
||||
{-# LANGUAGE GADTs #-}
|
||||
{-# LANGUAGE LambdaCase #-}
|
||||
{-# LANGUAGE NamedFieldPuns #-}
|
||||
{-# LANGUAGE OverloadedStrings #-}
|
||||
{-# LANGUAGE QuasiQuotes #-}
|
||||
{-# LANGUAGE ScopedTypeVariables #-}
|
||||
{-# LANGUAGE TypeFamilies #-}
|
||||
|
||||
module Simplex.FileTransfer.Server.Store.Postgres
|
||||
( PostgresFileStore (..),
|
||||
withDB,
|
||||
withDB',
|
||||
handleDuplicate,
|
||||
assertUpdated,
|
||||
withLog,
|
||||
)
|
||||
where
|
||||
|
||||
import qualified Control.Exception as E
|
||||
import Control.Logger.Simple
|
||||
import Control.Monad
|
||||
import Control.Monad.Except
|
||||
import Control.Monad.Trans.Except (throwE)
|
||||
import Control.Monad.IO.Class
|
||||
import Data.Functor (($>))
|
||||
import Data.Int (Int64)
|
||||
import Data.Text (Text)
|
||||
import Database.PostgreSQL.Simple (SqlError)
|
||||
import Database.PostgreSQL.Simple.Errors (ConstraintViolation (..), constraintViolation)
|
||||
import qualified Database.PostgreSQL.Simple as DB
|
||||
import GHC.IO (catchAny)
|
||||
import Simplex.FileTransfer.Server.Store
|
||||
import Simplex.FileTransfer.Server.Store.Postgres.Config
|
||||
import Simplex.FileTransfer.Server.Store.Postgres.Migrations (xftpServerMigrations)
|
||||
import Simplex.FileTransfer.Server.StoreLog
|
||||
import Simplex.FileTransfer.Transport (XFTPErrorType (..))
|
||||
import Simplex.Messaging.Agent.Store.Postgres (createDBStore, closeDBStore)
|
||||
import Simplex.Messaging.Agent.Store.Postgres.Common (DBStore, withTransaction)
|
||||
import Simplex.Messaging.Agent.Store.Shared (MigrationConfig (..))
|
||||
import Simplex.Messaging.Server.StoreLog (openWriteStoreLog)
|
||||
import Simplex.Messaging.Util (tshow)
|
||||
import System.Exit (exitFailure)
|
||||
import System.IO (IOMode (..))
|
||||
|
||||
data PostgresFileStore = PostgresFileStore
|
||||
{ dbStore :: DBStore,
|
||||
dbStoreLog :: Maybe (StoreLog 'WriteMode)
|
||||
}
|
||||
|
||||
instance FileStoreClass PostgresFileStore where
|
||||
type FileStoreConfig PostgresFileStore = PostgresFileStoreCfg
|
||||
|
||||
newFileStore PostgresFileStoreCfg {dbOpts, dbStoreLogPath, confirmMigrations} = do
|
||||
dbStore <- either err pure =<< createDBStore dbOpts xftpServerMigrations (MigrationConfig confirmMigrations Nothing)
|
||||
dbStoreLog <- mapM (openWriteStoreLog True) dbStoreLogPath
|
||||
pure PostgresFileStore {dbStore, dbStoreLog}
|
||||
where
|
||||
err e = do
|
||||
logError $ "STORE: newFileStore, error opening PostgreSQL database, " <> tshow e
|
||||
exitFailure
|
||||
|
||||
closeFileStore PostgresFileStore {dbStore, dbStoreLog} = do
|
||||
closeDBStore dbStore
|
||||
mapM_ closeStoreLog dbStoreLog
|
||||
|
||||
addFile _ _ _ _ _ = error "PostgresFileStore.addFile: not implemented"
|
||||
setFilePath _ _ _ = error "PostgresFileStore.setFilePath: not implemented"
|
||||
addRecipient _ _ _ = error "PostgresFileStore.addRecipient: not implemented"
|
||||
getFile _ _ _ = error "PostgresFileStore.getFile: not implemented"
|
||||
deleteFile _ _ = error "PostgresFileStore.deleteFile: not implemented"
|
||||
blockFile _ _ _ _ = error "PostgresFileStore.blockFile: not implemented"
|
||||
deleteRecipient _ _ _ = error "PostgresFileStore.deleteRecipient: not implemented"
|
||||
ackFile _ _ = error "PostgresFileStore.ackFile: not implemented"
|
||||
expiredFiles _ _ _ = error "PostgresFileStore.expiredFiles: not implemented"
|
||||
getUsedStorage _ = error "PostgresFileStore.getUsedStorage: not implemented"
|
||||
getFileCount _ = error "PostgresFileStore.getFileCount: not implemented"
|
||||
|
||||
-- Helpers
|
||||
|
||||
withDB :: forall a. Text -> PostgresFileStore -> (DB.Connection -> IO (Either XFTPErrorType a)) -> ExceptT XFTPErrorType IO a
|
||||
withDB op st action =
|
||||
ExceptT $ E.try (withTransaction (dbStore st) action) >>= either logErr pure
|
||||
where
|
||||
logErr :: E.SomeException -> IO (Either XFTPErrorType a)
|
||||
logErr e = logError ("STORE: " <> err) $> Left INTERNAL
|
||||
where
|
||||
err = op <> ", withDB, " <> tshow e
|
||||
|
||||
withDB' :: Text -> PostgresFileStore -> (DB.Connection -> IO a) -> ExceptT XFTPErrorType IO a
|
||||
withDB' op st action = withDB op st $ fmap Right . action
|
||||
|
||||
assertUpdated :: ExceptT XFTPErrorType IO Int64 -> ExceptT XFTPErrorType IO ()
|
||||
assertUpdated = (>>= \n -> when (n == 0) (throwE AUTH))
|
||||
|
||||
handleDuplicate :: SqlError -> IO (Either XFTPErrorType a)
|
||||
handleDuplicate e = case constraintViolation e of
|
||||
Just (UniqueViolation _) -> pure $ Left DUPLICATE_
|
||||
_ -> E.throwIO e
|
||||
|
||||
withLog :: MonadIO m => Text -> PostgresFileStore -> (StoreLog 'WriteMode -> IO ()) -> m ()
|
||||
withLog op PostgresFileStore {dbStoreLog} action =
|
||||
forM_ dbStoreLog $ \sl -> liftIO $ action sl `catchAny` \e ->
|
||||
logWarn $ "STORE: " <> op <> ", withLog, " <> tshow e
|
||||
@@ -0,0 +1,25 @@
|
||||
{-# LANGUAGE OverloadedStrings #-}
|
||||
|
||||
module Simplex.FileTransfer.Server.Store.Postgres.Config
|
||||
( PostgresFileStoreCfg (..),
|
||||
defaultXFTPDBOpts,
|
||||
)
|
||||
where
|
||||
|
||||
import Simplex.Messaging.Agent.Store.Postgres.Options (DBOpts (..))
|
||||
import Simplex.Messaging.Agent.Store.Shared (MigrationConfirmation)
|
||||
|
||||
data PostgresFileStoreCfg = PostgresFileStoreCfg
|
||||
{ dbOpts :: DBOpts,
|
||||
dbStoreLogPath :: Maybe FilePath,
|
||||
confirmMigrations :: MigrationConfirmation
|
||||
}
|
||||
|
||||
defaultXFTPDBOpts :: DBOpts
|
||||
defaultXFTPDBOpts =
|
||||
DBOpts
|
||||
{ connstr = "postgresql://xftp@/xftp_server_store",
|
||||
schema = "xftp_server",
|
||||
poolSize = 10,
|
||||
createSchema = False
|
||||
}
|
||||
@@ -0,0 +1,47 @@
|
||||
{-# LANGUAGE NamedFieldPuns #-}
|
||||
{-# LANGUAGE OverloadedStrings #-}
|
||||
{-# LANGUAGE QuasiQuotes #-}
|
||||
|
||||
module Simplex.FileTransfer.Server.Store.Postgres.Migrations
|
||||
( xftpServerMigrations,
|
||||
)
|
||||
where
|
||||
|
||||
import Data.List (sortOn)
|
||||
import Data.Text (Text)
|
||||
import Simplex.Messaging.Agent.Store.Shared
|
||||
import Text.RawString.QQ (r)
|
||||
|
||||
xftpSchemaMigrations :: [(String, Text, Maybe Text)]
|
||||
xftpSchemaMigrations =
|
||||
[ ("20260325_initial", m20260325_initial, Nothing)
|
||||
]
|
||||
|
||||
-- | The list of migrations in ascending order by date
|
||||
xftpServerMigrations :: [Migration]
|
||||
xftpServerMigrations = sortOn name $ map migration xftpSchemaMigrations
|
||||
where
|
||||
migration (name, up, down) = Migration {name, up, down = down}
|
||||
|
||||
m20260325_initial :: Text
|
||||
m20260325_initial =
|
||||
[r|
|
||||
CREATE TABLE files (
|
||||
sender_id BYTEA NOT NULL PRIMARY KEY,
|
||||
file_size INT4 NOT NULL,
|
||||
file_digest BYTEA NOT NULL,
|
||||
sender_key BYTEA NOT NULL,
|
||||
file_path TEXT,
|
||||
created_at INT8 NOT NULL,
|
||||
status TEXT NOT NULL DEFAULT 'active'
|
||||
);
|
||||
|
||||
CREATE TABLE recipients (
|
||||
recipient_id BYTEA NOT NULL PRIMARY KEY,
|
||||
sender_id BYTEA NOT NULL REFERENCES files ON DELETE CASCADE,
|
||||
recipient_key BYTEA NOT NULL
|
||||
);
|
||||
|
||||
CREATE INDEX idx_recipients_sender_id ON recipients (sender_id);
|
||||
CREATE INDEX idx_files_created_at ON files (created_at);
|
||||
|]
|
||||
Reference in New Issue
Block a user