mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-04-26 21:45:15 +00:00
feat: implement PostgresFileStore operations
This commit is contained in:
@@ -3,7 +3,6 @@
|
||||
{-# LANGUAGE LambdaCase #-}
|
||||
{-# LANGUAGE NamedFieldPuns #-}
|
||||
{-# LANGUAGE OverloadedStrings #-}
|
||||
{-# LANGUAGE QuasiQuotes #-}
|
||||
{-# LANGUAGE ScopedTypeVariables #-}
|
||||
{-# LANGUAGE TypeFamilies #-}
|
||||
|
||||
@@ -21,27 +20,35 @@ import qualified Control.Exception as E
|
||||
import Control.Logger.Simple
|
||||
import Control.Monad
|
||||
import Control.Monad.Except
|
||||
import Control.Monad.Trans.Except (throwE)
|
||||
import Control.Monad.IO.Class
|
||||
import Control.Monad.Trans.Except (throwE)
|
||||
import Data.Functor (($>))
|
||||
import Data.Int (Int64)
|
||||
import Data.Int (Int32, Int64)
|
||||
import qualified Data.Set as S
|
||||
import Data.Text (Text)
|
||||
import Database.PostgreSQL.Simple (SqlError)
|
||||
import Data.Word (Word32)
|
||||
import Database.PostgreSQL.Simple (Binary (..), Only (..), SqlError)
|
||||
import Database.PostgreSQL.Simple.Errors (ConstraintViolation (..), constraintViolation)
|
||||
import qualified Database.PostgreSQL.Simple as DB
|
||||
import GHC.IO (catchAny)
|
||||
import Simplex.FileTransfer.Protocol (FileInfo (..), SFileParty (..))
|
||||
import Simplex.FileTransfer.Server.Store
|
||||
import Simplex.FileTransfer.Server.Store.Postgres.Config
|
||||
import Simplex.FileTransfer.Server.Store.Postgres.Migrations (xftpServerMigrations)
|
||||
import Simplex.FileTransfer.Server.StoreLog
|
||||
import Simplex.FileTransfer.Transport (XFTPErrorType (..))
|
||||
import Simplex.Messaging.Agent.Store.Postgres (createDBStore, closeDBStore)
|
||||
import Simplex.Messaging.Agent.Store.Postgres (closeDBStore, createDBStore)
|
||||
import Simplex.Messaging.Agent.Store.Postgres.Common (DBStore, withTransaction)
|
||||
import Simplex.Messaging.Agent.Store.Shared (MigrationConfig (..))
|
||||
import qualified Simplex.Messaging.Crypto as C
|
||||
import Simplex.Messaging.Protocol (SenderId)
|
||||
import Simplex.Messaging.Server.QueueStore (ServerEntityStatus (..))
|
||||
import Simplex.Messaging.Server.QueueStore.Postgres ()
|
||||
import Simplex.Messaging.Server.StoreLog (openWriteStoreLog)
|
||||
import Simplex.Messaging.Util (tshow)
|
||||
import System.Exit (exitFailure)
|
||||
import System.IO (IOMode (..))
|
||||
import UnliftIO.STM
|
||||
|
||||
data PostgresFileStore = PostgresFileStore
|
||||
{ dbStore :: DBStore,
|
||||
@@ -64,19 +71,117 @@ instance FileStoreClass PostgresFileStore where
|
||||
closeDBStore dbStore
|
||||
mapM_ closeStoreLog dbStoreLog
|
||||
|
||||
addFile _ _ _ _ _ = error "PostgresFileStore.addFile: not implemented"
|
||||
setFilePath _ _ _ = error "PostgresFileStore.setFilePath: not implemented"
|
||||
addRecipient _ _ _ = error "PostgresFileStore.addRecipient: not implemented"
|
||||
getFile _ _ _ = error "PostgresFileStore.getFile: not implemented"
|
||||
deleteFile _ _ = error "PostgresFileStore.deleteFile: not implemented"
|
||||
blockFile _ _ _ _ = error "PostgresFileStore.blockFile: not implemented"
|
||||
deleteRecipient _ _ _ = error "PostgresFileStore.deleteRecipient: not implemented"
|
||||
ackFile _ _ = error "PostgresFileStore.ackFile: not implemented"
|
||||
expiredFiles _ _ _ = error "PostgresFileStore.expiredFiles: not implemented"
|
||||
getUsedStorage _ = error "PostgresFileStore.getUsedStorage: not implemented"
|
||||
getFileCount _ = error "PostgresFileStore.getFileCount: not implemented"
|
||||
addFile st sId fileInfo@FileInfo {sndKey, size, digest} createdAt status =
|
||||
E.uninterruptibleMask_ $ runExceptT $ do
|
||||
void $ withDB "addFile" st $ \db ->
|
||||
E.try
|
||||
( DB.execute
|
||||
db
|
||||
"INSERT INTO files (sender_id, file_size, file_digest, sender_key, created_at, status) VALUES (?,?,?,?,?,?)"
|
||||
(sId, (fromIntegral size :: Int32), Binary digest, Binary (C.encodePubKey sndKey), createdAt, status)
|
||||
)
|
||||
>>= either handleDuplicate (pure . Right)
|
||||
withLog "addFile" st $ \s -> logAddFile s sId fileInfo createdAt status
|
||||
|
||||
-- Helpers
|
||||
setFilePath st sId fPath = E.uninterruptibleMask_ $ runExceptT $ do
|
||||
assertUpdated $ withDB' "setFilePath" st $ \db ->
|
||||
DB.execute db "UPDATE files SET file_path = ? WHERE sender_id = ? AND file_path IS NULL" (fPath, sId)
|
||||
withLog "setFilePath" st $ \s -> logPutFile s sId fPath
|
||||
|
||||
addRecipient st senderId (FileRecipient rId rKey) = E.uninterruptibleMask_ $ runExceptT $ do
|
||||
void $ withDB "addRecipient" st $ \db ->
|
||||
E.try
|
||||
( DB.execute
|
||||
db
|
||||
"INSERT INTO recipients (recipient_id, sender_id, recipient_key) VALUES (?,?,?)"
|
||||
(rId, senderId, Binary (C.encodePubKey rKey))
|
||||
)
|
||||
>>= either handleDuplicate (pure . Right)
|
||||
withLog "addRecipient" st $ \s -> logAddRecipients s senderId (pure $ FileRecipient rId rKey)
|
||||
|
||||
getFile st party fId = runExceptT $ case party of
|
||||
SFSender ->
|
||||
withDB "getFile" st $ \db -> do
|
||||
rs <-
|
||||
DB.query
|
||||
db
|
||||
"SELECT file_size, file_digest, sender_key, file_path, created_at, status FROM files WHERE sender_id = ?"
|
||||
(Only fId)
|
||||
case rs of
|
||||
[(size, digest, sndKeyBs, path, createdAt, status)] ->
|
||||
case C.decodePubKey sndKeyBs of
|
||||
Right sndKey -> do
|
||||
let fileInfo = FileInfo {sndKey, size = fromIntegral (size :: Int32), digest}
|
||||
fr <- mkFileRec fId fileInfo path createdAt status
|
||||
pure $ Right (fr, sndKey)
|
||||
Left _ -> pure $ Left INTERNAL
|
||||
_ -> pure $ Left AUTH
|
||||
SFRecipient ->
|
||||
withDB "getFile" st $ \db -> do
|
||||
rs <-
|
||||
DB.query
|
||||
db
|
||||
"SELECT f.file_size, f.file_digest, f.sender_key, f.file_path, f.created_at, f.status, f.sender_id, r.recipient_key FROM recipients r JOIN files f ON r.sender_id = f.sender_id WHERE r.recipient_id = ?"
|
||||
(Only fId)
|
||||
case rs of
|
||||
[(size, digest, sndKeyBs, path, createdAt, status, senderId, rcpKeyBs)] ->
|
||||
case (C.decodePubKey sndKeyBs, C.decodePubKey rcpKeyBs) of
|
||||
(Right sndKey, Right rcpKey) -> do
|
||||
let fileInfo = FileInfo {sndKey, size = fromIntegral (size :: Int32), digest}
|
||||
fr <- mkFileRec senderId fileInfo path createdAt status
|
||||
pure $ Right (fr, rcpKey)
|
||||
_ -> pure $ Left INTERNAL
|
||||
_ -> pure $ Left AUTH
|
||||
|
||||
deleteFile st sId = E.uninterruptibleMask_ $ runExceptT $ do
|
||||
assertUpdated $ withDB' "deleteFile" st $ \db ->
|
||||
DB.execute db "DELETE FROM files WHERE sender_id = ?" (Only sId)
|
||||
withLog "deleteFile" st $ \s -> logDeleteFile s sId
|
||||
|
||||
blockFile st sId info _deleted = E.uninterruptibleMask_ $ runExceptT $ do
|
||||
assertUpdated $ withDB' "blockFile" st $ \db ->
|
||||
DB.execute db "UPDATE files SET status = ? WHERE sender_id = ?" (EntityBlocked info, sId)
|
||||
withLog "blockFile" st $ \s -> logBlockFile s sId info
|
||||
|
||||
deleteRecipient st rId _fr =
|
||||
void $ runExceptT $ withDB' "deleteRecipient" st $ \db ->
|
||||
DB.execute db "DELETE FROM recipients WHERE recipient_id = ?" (Only rId)
|
||||
|
||||
ackFile st rId = E.uninterruptibleMask_ $ runExceptT $ do
|
||||
assertUpdated $ withDB' "ackFile" st $ \db ->
|
||||
DB.execute db "DELETE FROM recipients WHERE recipient_id = ?" (Only rId)
|
||||
withLog "ackFile" st $ \s -> logAckFile s rId
|
||||
|
||||
expiredFiles st old limit =
|
||||
fmap toResult $ withTransaction (dbStore st) $ \db ->
|
||||
DB.query
|
||||
db
|
||||
"SELECT sender_id, file_path, file_size FROM files WHERE created_at + ? < ? ORDER BY created_at LIMIT ?"
|
||||
(fileTimePrecision, old, limit)
|
||||
where
|
||||
toResult :: [(SenderId, Maybe FilePath, Int32)] -> [(SenderId, Maybe FilePath, Word32)]
|
||||
toResult = map (\(sId, path, size) -> (sId, path, fromIntegral size))
|
||||
|
||||
getUsedStorage st =
|
||||
withTransaction (dbStore st) $ \db -> do
|
||||
[Only total] <- DB.query_ db "SELECT COALESCE(SUM(file_size::INT8), 0) FROM files"
|
||||
pure total
|
||||
|
||||
getFileCount st =
|
||||
withTransaction (dbStore st) $ \db -> do
|
||||
[Only count] <- DB.query_ db "SELECT COUNT(*) FROM files"
|
||||
pure (fromIntegral (count :: Int64))
|
||||
|
||||
-- Internal helpers
|
||||
|
||||
mkFileRec :: SenderId -> FileInfo -> Maybe FilePath -> RoundedFileTime -> ServerEntityStatus -> IO FileRec
|
||||
mkFileRec senderId fileInfo path createdAt status = do
|
||||
filePath <- newTVarIO path
|
||||
recipientIds <- newTVarIO S.empty
|
||||
fileStatus <- newTVarIO status
|
||||
pure FileRec {senderId, fileInfo, filePath, recipientIds, createdAt, fileStatus}
|
||||
|
||||
-- DB helpers
|
||||
|
||||
withDB :: forall a. Text -> PostgresFileStore -> (DB.Connection -> IO (Either XFTPErrorType a)) -> ExceptT XFTPErrorType IO a
|
||||
withDB op st action =
|
||||
|
||||
Reference in New Issue
Block a user