refactor: extract FileStoreClass typeclass, move STM impl to Store.STM

This commit is contained in:
shum
2026-04-01 13:22:14 +00:00
parent b0da98273b
commit 6f4bf647ed
6 changed files with 166 additions and 135 deletions

View File

@@ -51,6 +51,7 @@ import Simplex.FileTransfer.Server.Env
import Simplex.FileTransfer.Server.Prometheus
import Simplex.FileTransfer.Server.Stats
import Simplex.FileTransfer.Server.Store
import Simplex.FileTransfer.Server.Store.STM (STMFileStore)
import Simplex.FileTransfer.Server.StoreLog
import Simplex.FileTransfer.Transport
import qualified Simplex.Messaging.Crypto as C
@@ -500,12 +501,12 @@ processXFTPRequest HTTP2Body {bodyPart} = \case
let rIds = L.map (\(FileRecipient rId _) -> rId) rcps
pure $ FRSndIds sId rIds
pure $ either FRErr id r
addFileRetry :: FileStore -> FileInfo -> Int -> RoundedFileTime -> M (Either XFTPErrorType XFTPFileId)
addFileRetry :: STMFileStore -> FileInfo -> Int -> RoundedFileTime -> M (Either XFTPErrorType XFTPFileId)
addFileRetry st file n ts =
retryAdd n $ \sId -> runExceptT $ do
ExceptT $ addFile st sId file ts EntityActive
pure sId
addRecipientRetry :: FileStore -> Int -> XFTPFileId -> RcvPublicAuthKey -> M (Either XFTPErrorType FileRecipient)
addRecipientRetry :: STMFileStore -> Int -> XFTPFileId -> RcvPublicAuthKey -> M (Either XFTPErrorType FileRecipient)
addRecipientRetry st n sId rpk =
retryAdd n $ \rId -> runExceptT $ do
let rcp = FileRecipient rId rpk
@@ -616,7 +617,7 @@ blockServerFile fr@FileRec {senderId} info = do
withFileLog $ \sl -> logBlockFile sl senderId info
deleteOrBlockServerFile_ fr filesBlocked $ \st -> blockFile st senderId info True
deleteOrBlockServerFile_ :: FileRec -> (FileServerStats -> IORef Int) -> (FileStore -> IO (Either XFTPErrorType ())) -> M (Either XFTPErrorType ())
deleteOrBlockServerFile_ :: FileRec -> (FileServerStats -> IORef Int) -> (STMFileStore -> IO (Either XFTPErrorType ())) -> M (Either XFTPErrorType ())
deleteOrBlockServerFile_ FileRec {filePath, fileInfo} stat storeAction = runExceptT $ do
path <- readTVarIO filePath
stats <- asks serverStats

View File

@@ -30,6 +30,7 @@ import qualified Network.TLS as T
import Simplex.FileTransfer.Protocol (FileCmd, FileInfo (..), XFTPFileId)
import Simplex.FileTransfer.Server.Stats
import Simplex.FileTransfer.Server.Store
import Simplex.FileTransfer.Server.Store.STM (STMFileStore (..))
import Simplex.FileTransfer.Server.StoreLog
import Simplex.FileTransfer.Transport (VersionRangeXFTP)
import qualified Simplex.Messaging.Crypto as C
@@ -88,7 +89,7 @@ defaultInactiveClientExpiration =
data XFTPEnv = XFTPEnv
{ config :: XFTPServerConfig,
store :: FileStore,
store :: STMFileStore,
usedStorage :: TVar Int64,
storeLog :: Maybe (StoreLog 'WriteMode),
random :: TVar ChaChaDRG,
@@ -111,7 +112,7 @@ defaultFileExpiration =
newXFTPServerEnv :: XFTPServerConfig -> IO XFTPEnv
newXFTPServerEnv config@XFTPServerConfig {storeLogFile, fileSizeQuota, xftpCredentials, httpCredentials} = do
random <- C.newRandom
store <- newFileStore
store <- newFileStore ()
storeLog <- mapM (`readWriteFileStore` store) storeLogFile
used <- getUsedStorage store
usedStorage <- newTVarIO used

View File

@@ -1,55 +1,30 @@
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE TupleSections #-}
{-# LANGUAGE TypeFamilies #-}
module Simplex.FileTransfer.Server.Store
( FileStore (..),
( FileStoreClass (..),
FileRec (..),
FileRecipient (..),
RoundedFileTime,
newFileStore,
addFile,
setFilePath,
addRecipient,
deleteFile,
blockFile,
deleteRecipient,
getFile,
ackFile,
expiredFiles,
getUsedStorage,
getFileCount,
fileTimePrecision,
)
where
import Control.Concurrent.STM
import Control.Monad (forM)
import qualified Data.Attoparsec.ByteString.Char8 as A
import Data.Int (Int64)
import qualified Data.Map.Strict as M
import Data.Maybe (catMaybes)
import Data.Set (Set)
import qualified Data.Set as S
import Data.Word (Word32)
import Simplex.FileTransfer.Protocol (FileInfo (..), SFileParty (..), XFTPFileId)
import Simplex.FileTransfer.Transport (XFTPErrorType (..))
import Simplex.FileTransfer.Protocol (FileInfo (..), SFileParty, XFTPFileId)
import Simplex.FileTransfer.Transport (XFTPErrorType)
import qualified Simplex.Messaging.Crypto as C
import Simplex.Messaging.Encoding.String
import Simplex.Messaging.Protocol (BlockingInfo, RcvPublicAuthKey, RecipientId, SenderId)
import Simplex.Messaging.Server.QueueStore (ServerEntityStatus (..))
import Simplex.Messaging.Protocol (BlockingInfo, RecipientId, SenderId)
import Simplex.Messaging.Server.QueueStore (ServerEntityStatus)
import Simplex.Messaging.SystemTime
import Simplex.Messaging.TMap (TMap)
import qualified Simplex.Messaging.TMap as TM
import Simplex.Messaging.Util (ifM)
data FileStore = FileStore
{ files :: TMap SenderId FileRec,
recipients :: TMap RecipientId (SenderId, RcvPublicAuthKey)
}
data FileRec = FileRec
{ senderId :: SenderId,
@@ -65,108 +40,33 @@ type RoundedFileTime = RoundedSystemTime 3600
fileTimePrecision :: Int64
fileTimePrecision = 3600 -- truncate creation time to 1 hour
data FileRecipient = FileRecipient RecipientId RcvPublicAuthKey
data FileRecipient = FileRecipient RecipientId C.APublicAuthKey
deriving (Show)
instance StrEncoding FileRecipient where
strEncode (FileRecipient rId rKey) = strEncode rId <> ":" <> strEncode rKey
strP = FileRecipient <$> strP <* A.char ':' <*> strP
newFileStore :: IO FileStore
newFileStore = do
files <- TM.emptyIO
recipients <- TM.emptyIO
pure FileStore {files, recipients}
class FileStoreClass s where
type FileStoreConfig s
addFile :: FileStore -> SenderId -> FileInfo -> RoundedFileTime -> ServerEntityStatus -> IO (Either XFTPErrorType ())
addFile FileStore {files} sId fileInfo createdAt status = atomically $
ifM (TM.member sId files) (pure $ Left DUPLICATE_) $ do
f <- newFileRec sId fileInfo createdAt status
TM.insert sId f files
pure $ Right ()
-- Lifecycle
newFileStore :: FileStoreConfig s -> IO s
closeFileStore :: s -> IO ()
newFileRec :: SenderId -> FileInfo -> RoundedFileTime -> ServerEntityStatus -> STM FileRec
newFileRec senderId fileInfo createdAt status = do
recipientIds <- newTVar S.empty
filePath <- newTVar Nothing
fileStatus <- newTVar status
pure FileRec {senderId, fileInfo, filePath, recipientIds, createdAt, fileStatus}
-- File operations
addFile :: s -> SenderId -> FileInfo -> RoundedFileTime -> ServerEntityStatus -> IO (Either XFTPErrorType ())
setFilePath :: s -> SenderId -> FilePath -> IO (Either XFTPErrorType ())
addRecipient :: s -> SenderId -> FileRecipient -> IO (Either XFTPErrorType ())
getFile :: s -> SFileParty p -> XFTPFileId -> IO (Either XFTPErrorType (FileRec, C.APublicAuthKey))
deleteFile :: s -> SenderId -> IO (Either XFTPErrorType ())
blockFile :: s -> SenderId -> BlockingInfo -> Bool -> IO (Either XFTPErrorType ())
deleteRecipient :: s -> RecipientId -> FileRec -> IO ()
ackFile :: s -> RecipientId -> IO (Either XFTPErrorType ())
setFilePath :: FileStore -> SenderId -> FilePath -> IO (Either XFTPErrorType ())
setFilePath st sId fPath = atomically $
withFile st sId $ \FileRec {filePath} -> do
writeTVar filePath (Just fPath)
pure $ Right ()
-- Expiration (with LIMIT for Postgres; called in a loop until empty)
expiredFiles :: s -> Int64 -> Int -> IO [(SenderId, Maybe FilePath, Word32)]
addRecipient :: FileStore -> SenderId -> FileRecipient -> IO (Either XFTPErrorType ())
addRecipient st@FileStore {recipients} senderId (FileRecipient rId rKey) = atomically $
withFile st senderId $ \FileRec {recipientIds} -> do
rIds <- readTVar recipientIds
mem <- TM.member rId recipients
if rId `S.member` rIds || mem
then pure $ Left DUPLICATE_
else do
writeTVar recipientIds $! S.insert rId rIds
TM.insert rId (senderId, rKey) recipients
pure $ Right ()
-- this function must be called after the file is deleted from the file system
deleteFile :: FileStore -> SenderId -> IO (Either XFTPErrorType ())
deleteFile FileStore {files, recipients} senderId = atomically $ do
TM.lookupDelete senderId files >>= \case
Just FileRec {recipientIds} -> do
readTVar recipientIds >>= mapM_ (`TM.delete` recipients)
pure $ Right ()
_ -> pure $ Left AUTH
-- this function must be called after the file is deleted from the file system
blockFile :: FileStore -> SenderId -> BlockingInfo -> Bool -> IO (Either XFTPErrorType ())
blockFile st senderId info _deleted = atomically $
withFile st senderId $ \FileRec {fileStatus} -> do
writeTVar fileStatus $! EntityBlocked info
pure $ Right ()
deleteRecipient :: FileStore -> RecipientId -> FileRec -> IO ()
deleteRecipient FileStore {recipients} rId FileRec {recipientIds} = atomically $ do
TM.delete rId recipients
modifyTVar' recipientIds $ S.delete rId
getFile :: FileStore -> SFileParty p -> XFTPFileId -> IO (Either XFTPErrorType (FileRec, C.APublicAuthKey))
getFile st party fId = atomically $ case party of
SFSender -> withFile st fId $ pure . Right . (\f -> (f, sndKey $ fileInfo f))
SFRecipient ->
TM.lookup fId (recipients st) >>= \case
Just (sId, rKey) -> withFile st sId $ pure . Right . (,rKey)
_ -> pure $ Left AUTH
ackFile :: FileStore -> RecipientId -> IO (Either XFTPErrorType ())
ackFile st@FileStore {recipients} recipientId = atomically $ do
TM.lookupDelete recipientId recipients >>= \case
Just (sId, _) ->
withFile st sId $ \FileRec {recipientIds} -> do
modifyTVar' recipientIds $ S.delete recipientId
pure $ Right ()
_ -> pure $ Left AUTH
expiredFiles :: FileStore -> Int64 -> Int -> IO [(SenderId, Maybe FilePath, Word32)]
expiredFiles FileStore {files} old _limit = do
fs <- readTVarIO files
fmap catMaybes . forM (M.toList fs) $ \(sId, FileRec {fileInfo = FileInfo {size}, filePath, createdAt = RoundedSystemTime createdAt}) ->
if createdAt + fileTimePrecision < old
then do
path <- readTVarIO filePath
pure $ Just (sId, path, size)
else pure Nothing
getUsedStorage :: FileStore -> IO Int64
getUsedStorage FileStore {files} =
M.foldl' (\acc FileRec {fileInfo = FileInfo {size}} -> acc + fromIntegral size) 0 <$> readTVarIO files
getFileCount :: FileStore -> IO Int
getFileCount FileStore {files} = M.size <$> readTVarIO files
withFile :: FileStore -> SenderId -> (FileRec -> STM (Either XFTPErrorType a)) -> STM (Either XFTPErrorType a)
withFile FileStore {files} sId a =
TM.lookup sId files >>= \case
Just f -> a f
_ -> pure $ Left AUTH
-- Storage and stats (for init-time computation)
getUsedStorage :: s -> IO Int64
getFileCount :: s -> IO Int

View File

@@ -0,0 +1,127 @@
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE TupleSections #-}
{-# LANGUAGE TypeFamilies #-}
module Simplex.FileTransfer.Server.Store.STM
( STMFileStore (..),
)
where
import Control.Concurrent.STM
import Control.Monad (forM)
import Data.Int (Int64)
import qualified Data.Map.Strict as M
import Data.Maybe (catMaybes)
import Data.Set (Set)
import qualified Data.Set as S
import Data.Word (Word32)
import Simplex.FileTransfer.Protocol (FileInfo (..), SFileParty (..), XFTPFileId)
import Simplex.FileTransfer.Server.Store
import Simplex.FileTransfer.Transport (XFTPErrorType (..))
import qualified Simplex.Messaging.Crypto as C
import Simplex.Messaging.Protocol (BlockingInfo, RcvPublicAuthKey, RecipientId, SenderId)
import Simplex.Messaging.Server.QueueStore (ServerEntityStatus (..))
import Simplex.Messaging.SystemTime
import Simplex.Messaging.TMap (TMap)
import qualified Simplex.Messaging.TMap as TM
import Simplex.Messaging.Util (ifM)
data STMFileStore = STMFileStore
{ files :: TMap SenderId FileRec,
recipients :: TMap RecipientId (SenderId, RcvPublicAuthKey)
}
instance FileStoreClass STMFileStore where
type FileStoreConfig STMFileStore = ()
newFileStore () = do
files <- TM.emptyIO
recipients <- TM.emptyIO
pure STMFileStore {files, recipients}
closeFileStore _ = pure ()
addFile STMFileStore {files} sId fileInfo createdAt status = atomically $
ifM (TM.member sId files) (pure $ Left DUPLICATE_) $ do
f <- newFileRec sId fileInfo createdAt status
TM.insert sId f files
pure $ Right ()
setFilePath st sId fPath = atomically $
withSTMFile st sId $ \FileRec {filePath} -> do
writeTVar filePath (Just fPath)
pure $ Right ()
addRecipient st@STMFileStore {recipients} senderId (FileRecipient rId rKey) = atomically $
withSTMFile st senderId $ \FileRec {recipientIds} -> do
rIds <- readTVar recipientIds
mem <- TM.member rId recipients
if rId `S.member` rIds || mem
then pure $ Left DUPLICATE_
else do
writeTVar recipientIds $! S.insert rId rIds
TM.insert rId (senderId, rKey) recipients
pure $ Right ()
getFile st party fId = atomically $ case party of
SFSender -> withSTMFile st fId $ pure . Right . (\f -> (f, sndKey $ fileInfo f))
SFRecipient ->
TM.lookup fId (recipients st) >>= \case
Just (sId, rKey) -> withSTMFile st sId $ pure . Right . (,rKey)
_ -> pure $ Left AUTH
deleteFile STMFileStore {files, recipients} senderId = atomically $ do
TM.lookupDelete senderId files >>= \case
Just FileRec {recipientIds} -> do
readTVar recipientIds >>= mapM_ (`TM.delete` recipients)
pure $ Right ()
_ -> pure $ Left AUTH
blockFile st senderId info _deleted = atomically $
withSTMFile st senderId $ \FileRec {fileStatus} -> do
writeTVar fileStatus $! EntityBlocked info
pure $ Right ()
deleteRecipient STMFileStore {recipients} rId FileRec {recipientIds} = atomically $ do
TM.delete rId recipients
modifyTVar' recipientIds $ S.delete rId
ackFile st@STMFileStore {recipients} recipientId = atomically $ do
TM.lookupDelete recipientId recipients >>= \case
Just (sId, _) ->
withSTMFile st sId $ \FileRec {recipientIds} -> do
modifyTVar' recipientIds $ S.delete recipientId
pure $ Right ()
_ -> pure $ Left AUTH
expiredFiles STMFileStore {files} old _limit = do
fs <- readTVarIO files
fmap catMaybes . forM (M.toList fs) $ \(sId, FileRec {fileInfo = FileInfo {size}, filePath, createdAt = RoundedSystemTime createdAt}) ->
if createdAt + fileTimePrecision < old
then do
path <- readTVarIO filePath
pure $ Just (sId, path, size)
else pure Nothing
getUsedStorage STMFileStore {files} =
M.foldl' (\acc FileRec {fileInfo = FileInfo {size}} -> acc + fromIntegral size) 0 <$> readTVarIO files
getFileCount STMFileStore {files} = M.size <$> readTVarIO files
-- Internal STM helpers
newFileRec :: SenderId -> FileInfo -> RoundedFileTime -> ServerEntityStatus -> STM FileRec
newFileRec senderId fileInfo createdAt status = do
recipientIds <- newTVar S.empty
filePath <- newTVar Nothing
fileStatus <- newTVar status
pure FileRec {senderId, fileInfo, filePath, recipientIds, createdAt, fileStatus}
withSTMFile :: STMFileStore -> SenderId -> (FileRec -> STM (Either XFTPErrorType a)) -> STM (Either XFTPErrorType a)
withSTMFile STMFileStore {files} sId a =
TM.lookup sId files >>= \case
Just f -> a f
_ -> pure $ Left AUTH

View File

@@ -32,6 +32,7 @@ import Data.Map.Strict (Map)
import qualified Data.Map.Strict as M
import Simplex.FileTransfer.Protocol (FileInfo (..))
import Simplex.FileTransfer.Server.Store
import Simplex.FileTransfer.Server.Store.STM (STMFileStore (..))
import Simplex.Messaging.Encoding.String
import Simplex.Messaging.Protocol (BlockingInfo, RcvPublicAuthKey, RecipientId, SenderId)
import Simplex.Messaging.Server.QueueStore (ServerEntityStatus (..))
@@ -87,10 +88,10 @@ logBlockFile s fId = logFileStoreRecord s . BlockFile fId
logAckFile :: StoreLog 'WriteMode -> RecipientId -> IO ()
logAckFile s = logFileStoreRecord s . AckFile
readWriteFileStore :: FilePath -> FileStore -> IO (StoreLog 'WriteMode)
readWriteFileStore :: FilePath -> STMFileStore -> IO (StoreLog 'WriteMode)
readWriteFileStore = readWriteStoreLog readFileStore writeFileStore
readFileStore :: FilePath -> FileStore -> IO ()
readFileStore :: FilePath -> STMFileStore -> IO ()
readFileStore f st = mapM_ (addFileLogRecord . LB.toStrict) . LB.lines =<< LB.readFile f
where
addFileLogRecord s = case strDecode s of
@@ -108,8 +109,8 @@ readFileStore f st = mapM_ (addFileLogRecord . LB.toStrict) . LB.lines =<< LB.re
AckFile rId -> ackFile st rId
addRecipients sId rcps = mapM_ (ExceptT . addRecipient st sId) rcps
writeFileStore :: StoreLog 'WriteMode -> FileStore -> IO ()
writeFileStore s FileStore {files, recipients} = do
writeFileStore :: StoreLog 'WriteMode -> STMFileStore -> IO ()
writeFileStore s STMFileStore {files, recipients} = do
allRcps <- readTVarIO recipients
readTVarIO files >>= mapM_ (logFile allRcps)
where