From 6f4bf647ede4cf4db039adf8679da596e1d4318d Mon Sep 17 00:00:00 2001 From: shum Date: Wed, 1 Apr 2026 13:22:14 +0000 Subject: [PATCH] refactor: extract FileStoreClass typeclass, move STM impl to Store.STM --- simplexmq.cabal | 1 + src/Simplex/FileTransfer/Server.hs | 7 +- src/Simplex/FileTransfer/Server/Env.hs | 5 +- src/Simplex/FileTransfer/Server/Store.hs | 152 ++++--------------- src/Simplex/FileTransfer/Server/Store/STM.hs | 127 ++++++++++++++++ src/Simplex/FileTransfer/Server/StoreLog.hs | 9 +- 6 files changed, 166 insertions(+), 135 deletions(-) create mode 100644 src/Simplex/FileTransfer/Server/Store/STM.hs diff --git a/simplexmq.cabal b/simplexmq.cabal index 3ad23df09..329187a81 100644 --- a/simplexmq.cabal +++ b/simplexmq.cabal @@ -241,6 +241,7 @@ library Simplex.FileTransfer.Server.Prometheus Simplex.FileTransfer.Server.Stats Simplex.FileTransfer.Server.Store + Simplex.FileTransfer.Server.Store.STM Simplex.FileTransfer.Server.StoreLog Simplex.Messaging.Server Simplex.Messaging.Server.CLI diff --git a/src/Simplex/FileTransfer/Server.hs b/src/Simplex/FileTransfer/Server.hs index 711e5e082..6bbf7b85f 100644 --- a/src/Simplex/FileTransfer/Server.hs +++ b/src/Simplex/FileTransfer/Server.hs @@ -51,6 +51,7 @@ import Simplex.FileTransfer.Server.Env import Simplex.FileTransfer.Server.Prometheus import Simplex.FileTransfer.Server.Stats import Simplex.FileTransfer.Server.Store +import Simplex.FileTransfer.Server.Store.STM (STMFileStore) import Simplex.FileTransfer.Server.StoreLog import Simplex.FileTransfer.Transport import qualified Simplex.Messaging.Crypto as C @@ -500,12 +501,12 @@ processXFTPRequest HTTP2Body {bodyPart} = \case let rIds = L.map (\(FileRecipient rId _) -> rId) rcps pure $ FRSndIds sId rIds pure $ either FRErr id r - addFileRetry :: FileStore -> FileInfo -> Int -> RoundedFileTime -> M (Either XFTPErrorType XFTPFileId) + addFileRetry :: STMFileStore -> FileInfo -> Int -> RoundedFileTime -> M (Either XFTPErrorType XFTPFileId) addFileRetry st file n ts = retryAdd n $ \sId -> runExceptT $ do ExceptT $ addFile st sId file ts EntityActive pure sId - addRecipientRetry :: FileStore -> Int -> XFTPFileId -> RcvPublicAuthKey -> M (Either XFTPErrorType FileRecipient) + addRecipientRetry :: STMFileStore -> Int -> XFTPFileId -> RcvPublicAuthKey -> M (Either XFTPErrorType FileRecipient) addRecipientRetry st n sId rpk = retryAdd n $ \rId -> runExceptT $ do let rcp = FileRecipient rId rpk @@ -616,7 +617,7 @@ blockServerFile fr@FileRec {senderId} info = do withFileLog $ \sl -> logBlockFile sl senderId info deleteOrBlockServerFile_ fr filesBlocked $ \st -> blockFile st senderId info True -deleteOrBlockServerFile_ :: FileRec -> (FileServerStats -> IORef Int) -> (FileStore -> IO (Either XFTPErrorType ())) -> M (Either XFTPErrorType ()) +deleteOrBlockServerFile_ :: FileRec -> (FileServerStats -> IORef Int) -> (STMFileStore -> IO (Either XFTPErrorType ())) -> M (Either XFTPErrorType ()) deleteOrBlockServerFile_ FileRec {filePath, fileInfo} stat storeAction = runExceptT $ do path <- readTVarIO filePath stats <- asks serverStats diff --git a/src/Simplex/FileTransfer/Server/Env.hs b/src/Simplex/FileTransfer/Server/Env.hs index dfa3da105..ce1ebecbf 100644 --- a/src/Simplex/FileTransfer/Server/Env.hs +++ b/src/Simplex/FileTransfer/Server/Env.hs @@ -30,6 +30,7 @@ import qualified Network.TLS as T import Simplex.FileTransfer.Protocol (FileCmd, FileInfo (..), XFTPFileId) import Simplex.FileTransfer.Server.Stats import Simplex.FileTransfer.Server.Store +import Simplex.FileTransfer.Server.Store.STM (STMFileStore (..)) import Simplex.FileTransfer.Server.StoreLog import Simplex.FileTransfer.Transport (VersionRangeXFTP) import qualified Simplex.Messaging.Crypto as C @@ -88,7 +89,7 @@ defaultInactiveClientExpiration = data XFTPEnv = XFTPEnv { config :: XFTPServerConfig, - store :: FileStore, + store :: STMFileStore, usedStorage :: TVar Int64, storeLog :: Maybe (StoreLog 'WriteMode), random :: TVar ChaChaDRG, @@ -111,7 +112,7 @@ defaultFileExpiration = newXFTPServerEnv :: XFTPServerConfig -> IO XFTPEnv newXFTPServerEnv config@XFTPServerConfig {storeLogFile, fileSizeQuota, xftpCredentials, httpCredentials} = do random <- C.newRandom - store <- newFileStore + store <- newFileStore () storeLog <- mapM (`readWriteFileStore` store) storeLogFile used <- getUsedStorage store usedStorage <- newTVarIO used diff --git a/src/Simplex/FileTransfer/Server/Store.hs b/src/Simplex/FileTransfer/Server/Store.hs index 2ea460761..a3a4d5795 100644 --- a/src/Simplex/FileTransfer/Server/Store.hs +++ b/src/Simplex/FileTransfer/Server/Store.hs @@ -1,55 +1,30 @@ {-# LANGUAGE DataKinds #-} {-# LANGUAGE GADTs #-} -{-# LANGUAGE LambdaCase #-} {-# LANGUAGE NamedFieldPuns #-} {-# LANGUAGE OverloadedStrings #-} -{-# LANGUAGE TupleSections #-} +{-# LANGUAGE TypeFamilies #-} module Simplex.FileTransfer.Server.Store - ( FileStore (..), + ( FileStoreClass (..), FileRec (..), FileRecipient (..), RoundedFileTime, - newFileStore, - addFile, - setFilePath, - addRecipient, - deleteFile, - blockFile, - deleteRecipient, - getFile, - ackFile, - expiredFiles, - getUsedStorage, - getFileCount, fileTimePrecision, ) where import Control.Concurrent.STM -import Control.Monad (forM) import qualified Data.Attoparsec.ByteString.Char8 as A import Data.Int (Int64) -import qualified Data.Map.Strict as M -import Data.Maybe (catMaybes) import Data.Set (Set) -import qualified Data.Set as S import Data.Word (Word32) -import Simplex.FileTransfer.Protocol (FileInfo (..), SFileParty (..), XFTPFileId) -import Simplex.FileTransfer.Transport (XFTPErrorType (..)) +import Simplex.FileTransfer.Protocol (FileInfo (..), SFileParty, XFTPFileId) +import Simplex.FileTransfer.Transport (XFTPErrorType) import qualified Simplex.Messaging.Crypto as C import Simplex.Messaging.Encoding.String -import Simplex.Messaging.Protocol (BlockingInfo, RcvPublicAuthKey, RecipientId, SenderId) -import Simplex.Messaging.Server.QueueStore (ServerEntityStatus (..)) +import Simplex.Messaging.Protocol (BlockingInfo, RecipientId, SenderId) +import Simplex.Messaging.Server.QueueStore (ServerEntityStatus) import Simplex.Messaging.SystemTime -import Simplex.Messaging.TMap (TMap) -import qualified Simplex.Messaging.TMap as TM -import Simplex.Messaging.Util (ifM) - -data FileStore = FileStore - { files :: TMap SenderId FileRec, - recipients :: TMap RecipientId (SenderId, RcvPublicAuthKey) - } data FileRec = FileRec { senderId :: SenderId, @@ -65,108 +40,33 @@ type RoundedFileTime = RoundedSystemTime 3600 fileTimePrecision :: Int64 fileTimePrecision = 3600 -- truncate creation time to 1 hour -data FileRecipient = FileRecipient RecipientId RcvPublicAuthKey +data FileRecipient = FileRecipient RecipientId C.APublicAuthKey deriving (Show) instance StrEncoding FileRecipient where strEncode (FileRecipient rId rKey) = strEncode rId <> ":" <> strEncode rKey strP = FileRecipient <$> strP <* A.char ':' <*> strP -newFileStore :: IO FileStore -newFileStore = do - files <- TM.emptyIO - recipients <- TM.emptyIO - pure FileStore {files, recipients} +class FileStoreClass s where + type FileStoreConfig s -addFile :: FileStore -> SenderId -> FileInfo -> RoundedFileTime -> ServerEntityStatus -> IO (Either XFTPErrorType ()) -addFile FileStore {files} sId fileInfo createdAt status = atomically $ - ifM (TM.member sId files) (pure $ Left DUPLICATE_) $ do - f <- newFileRec sId fileInfo createdAt status - TM.insert sId f files - pure $ Right () + -- Lifecycle + newFileStore :: FileStoreConfig s -> IO s + closeFileStore :: s -> IO () -newFileRec :: SenderId -> FileInfo -> RoundedFileTime -> ServerEntityStatus -> STM FileRec -newFileRec senderId fileInfo createdAt status = do - recipientIds <- newTVar S.empty - filePath <- newTVar Nothing - fileStatus <- newTVar status - pure FileRec {senderId, fileInfo, filePath, recipientIds, createdAt, fileStatus} + -- File operations + addFile :: s -> SenderId -> FileInfo -> RoundedFileTime -> ServerEntityStatus -> IO (Either XFTPErrorType ()) + setFilePath :: s -> SenderId -> FilePath -> IO (Either XFTPErrorType ()) + addRecipient :: s -> SenderId -> FileRecipient -> IO (Either XFTPErrorType ()) + getFile :: s -> SFileParty p -> XFTPFileId -> IO (Either XFTPErrorType (FileRec, C.APublicAuthKey)) + deleteFile :: s -> SenderId -> IO (Either XFTPErrorType ()) + blockFile :: s -> SenderId -> BlockingInfo -> Bool -> IO (Either XFTPErrorType ()) + deleteRecipient :: s -> RecipientId -> FileRec -> IO () + ackFile :: s -> RecipientId -> IO (Either XFTPErrorType ()) -setFilePath :: FileStore -> SenderId -> FilePath -> IO (Either XFTPErrorType ()) -setFilePath st sId fPath = atomically $ - withFile st sId $ \FileRec {filePath} -> do - writeTVar filePath (Just fPath) - pure $ Right () + -- Expiration (with LIMIT for Postgres; called in a loop until empty) + expiredFiles :: s -> Int64 -> Int -> IO [(SenderId, Maybe FilePath, Word32)] -addRecipient :: FileStore -> SenderId -> FileRecipient -> IO (Either XFTPErrorType ()) -addRecipient st@FileStore {recipients} senderId (FileRecipient rId rKey) = atomically $ - withFile st senderId $ \FileRec {recipientIds} -> do - rIds <- readTVar recipientIds - mem <- TM.member rId recipients - if rId `S.member` rIds || mem - then pure $ Left DUPLICATE_ - else do - writeTVar recipientIds $! S.insert rId rIds - TM.insert rId (senderId, rKey) recipients - pure $ Right () - --- this function must be called after the file is deleted from the file system -deleteFile :: FileStore -> SenderId -> IO (Either XFTPErrorType ()) -deleteFile FileStore {files, recipients} senderId = atomically $ do - TM.lookupDelete senderId files >>= \case - Just FileRec {recipientIds} -> do - readTVar recipientIds >>= mapM_ (`TM.delete` recipients) - pure $ Right () - _ -> pure $ Left AUTH - --- this function must be called after the file is deleted from the file system -blockFile :: FileStore -> SenderId -> BlockingInfo -> Bool -> IO (Either XFTPErrorType ()) -blockFile st senderId info _deleted = atomically $ - withFile st senderId $ \FileRec {fileStatus} -> do - writeTVar fileStatus $! EntityBlocked info - pure $ Right () - -deleteRecipient :: FileStore -> RecipientId -> FileRec -> IO () -deleteRecipient FileStore {recipients} rId FileRec {recipientIds} = atomically $ do - TM.delete rId recipients - modifyTVar' recipientIds $ S.delete rId - -getFile :: FileStore -> SFileParty p -> XFTPFileId -> IO (Either XFTPErrorType (FileRec, C.APublicAuthKey)) -getFile st party fId = atomically $ case party of - SFSender -> withFile st fId $ pure . Right . (\f -> (f, sndKey $ fileInfo f)) - SFRecipient -> - TM.lookup fId (recipients st) >>= \case - Just (sId, rKey) -> withFile st sId $ pure . Right . (,rKey) - _ -> pure $ Left AUTH - -ackFile :: FileStore -> RecipientId -> IO (Either XFTPErrorType ()) -ackFile st@FileStore {recipients} recipientId = atomically $ do - TM.lookupDelete recipientId recipients >>= \case - Just (sId, _) -> - withFile st sId $ \FileRec {recipientIds} -> do - modifyTVar' recipientIds $ S.delete recipientId - pure $ Right () - _ -> pure $ Left AUTH - -expiredFiles :: FileStore -> Int64 -> Int -> IO [(SenderId, Maybe FilePath, Word32)] -expiredFiles FileStore {files} old _limit = do - fs <- readTVarIO files - fmap catMaybes . forM (M.toList fs) $ \(sId, FileRec {fileInfo = FileInfo {size}, filePath, createdAt = RoundedSystemTime createdAt}) -> - if createdAt + fileTimePrecision < old - then do - path <- readTVarIO filePath - pure $ Just (sId, path, size) - else pure Nothing - -getUsedStorage :: FileStore -> IO Int64 -getUsedStorage FileStore {files} = - M.foldl' (\acc FileRec {fileInfo = FileInfo {size}} -> acc + fromIntegral size) 0 <$> readTVarIO files - -getFileCount :: FileStore -> IO Int -getFileCount FileStore {files} = M.size <$> readTVarIO files - -withFile :: FileStore -> SenderId -> (FileRec -> STM (Either XFTPErrorType a)) -> STM (Either XFTPErrorType a) -withFile FileStore {files} sId a = - TM.lookup sId files >>= \case - Just f -> a f - _ -> pure $ Left AUTH + -- Storage and stats (for init-time computation) + getUsedStorage :: s -> IO Int64 + getFileCount :: s -> IO Int diff --git a/src/Simplex/FileTransfer/Server/Store/STM.hs b/src/Simplex/FileTransfer/Server/Store/STM.hs new file mode 100644 index 000000000..7859d06aa --- /dev/null +++ b/src/Simplex/FileTransfer/Server/Store/STM.hs @@ -0,0 +1,127 @@ +{-# LANGUAGE DataKinds #-} +{-# LANGUAGE GADTs #-} +{-# LANGUAGE LambdaCase #-} +{-# LANGUAGE NamedFieldPuns #-} +{-# LANGUAGE TupleSections #-} +{-# LANGUAGE TypeFamilies #-} + +module Simplex.FileTransfer.Server.Store.STM + ( STMFileStore (..), + ) +where + +import Control.Concurrent.STM +import Control.Monad (forM) +import Data.Int (Int64) +import qualified Data.Map.Strict as M +import Data.Maybe (catMaybes) +import Data.Set (Set) +import qualified Data.Set as S +import Data.Word (Word32) +import Simplex.FileTransfer.Protocol (FileInfo (..), SFileParty (..), XFTPFileId) +import Simplex.FileTransfer.Server.Store +import Simplex.FileTransfer.Transport (XFTPErrorType (..)) +import qualified Simplex.Messaging.Crypto as C +import Simplex.Messaging.Protocol (BlockingInfo, RcvPublicAuthKey, RecipientId, SenderId) +import Simplex.Messaging.Server.QueueStore (ServerEntityStatus (..)) +import Simplex.Messaging.SystemTime +import Simplex.Messaging.TMap (TMap) +import qualified Simplex.Messaging.TMap as TM +import Simplex.Messaging.Util (ifM) + +data STMFileStore = STMFileStore + { files :: TMap SenderId FileRec, + recipients :: TMap RecipientId (SenderId, RcvPublicAuthKey) + } + +instance FileStoreClass STMFileStore where + type FileStoreConfig STMFileStore = () + + newFileStore () = do + files <- TM.emptyIO + recipients <- TM.emptyIO + pure STMFileStore {files, recipients} + + closeFileStore _ = pure () + + addFile STMFileStore {files} sId fileInfo createdAt status = atomically $ + ifM (TM.member sId files) (pure $ Left DUPLICATE_) $ do + f <- newFileRec sId fileInfo createdAt status + TM.insert sId f files + pure $ Right () + + setFilePath st sId fPath = atomically $ + withSTMFile st sId $ \FileRec {filePath} -> do + writeTVar filePath (Just fPath) + pure $ Right () + + addRecipient st@STMFileStore {recipients} senderId (FileRecipient rId rKey) = atomically $ + withSTMFile st senderId $ \FileRec {recipientIds} -> do + rIds <- readTVar recipientIds + mem <- TM.member rId recipients + if rId `S.member` rIds || mem + then pure $ Left DUPLICATE_ + else do + writeTVar recipientIds $! S.insert rId rIds + TM.insert rId (senderId, rKey) recipients + pure $ Right () + + getFile st party fId = atomically $ case party of + SFSender -> withSTMFile st fId $ pure . Right . (\f -> (f, sndKey $ fileInfo f)) + SFRecipient -> + TM.lookup fId (recipients st) >>= \case + Just (sId, rKey) -> withSTMFile st sId $ pure . Right . (,rKey) + _ -> pure $ Left AUTH + + deleteFile STMFileStore {files, recipients} senderId = atomically $ do + TM.lookupDelete senderId files >>= \case + Just FileRec {recipientIds} -> do + readTVar recipientIds >>= mapM_ (`TM.delete` recipients) + pure $ Right () + _ -> pure $ Left AUTH + + blockFile st senderId info _deleted = atomically $ + withSTMFile st senderId $ \FileRec {fileStatus} -> do + writeTVar fileStatus $! EntityBlocked info + pure $ Right () + + deleteRecipient STMFileStore {recipients} rId FileRec {recipientIds} = atomically $ do + TM.delete rId recipients + modifyTVar' recipientIds $ S.delete rId + + ackFile st@STMFileStore {recipients} recipientId = atomically $ do + TM.lookupDelete recipientId recipients >>= \case + Just (sId, _) -> + withSTMFile st sId $ \FileRec {recipientIds} -> do + modifyTVar' recipientIds $ S.delete recipientId + pure $ Right () + _ -> pure $ Left AUTH + + expiredFiles STMFileStore {files} old _limit = do + fs <- readTVarIO files + fmap catMaybes . forM (M.toList fs) $ \(sId, FileRec {fileInfo = FileInfo {size}, filePath, createdAt = RoundedSystemTime createdAt}) -> + if createdAt + fileTimePrecision < old + then do + path <- readTVarIO filePath + pure $ Just (sId, path, size) + else pure Nothing + + getUsedStorage STMFileStore {files} = + M.foldl' (\acc FileRec {fileInfo = FileInfo {size}} -> acc + fromIntegral size) 0 <$> readTVarIO files + + getFileCount STMFileStore {files} = M.size <$> readTVarIO files + +-- Internal STM helpers + +newFileRec :: SenderId -> FileInfo -> RoundedFileTime -> ServerEntityStatus -> STM FileRec +newFileRec senderId fileInfo createdAt status = do + recipientIds <- newTVar S.empty + filePath <- newTVar Nothing + fileStatus <- newTVar status + pure FileRec {senderId, fileInfo, filePath, recipientIds, createdAt, fileStatus} + +withSTMFile :: STMFileStore -> SenderId -> (FileRec -> STM (Either XFTPErrorType a)) -> STM (Either XFTPErrorType a) +withSTMFile STMFileStore {files} sId a = + TM.lookup sId files >>= \case + Just f -> a f + _ -> pure $ Left AUTH diff --git a/src/Simplex/FileTransfer/Server/StoreLog.hs b/src/Simplex/FileTransfer/Server/StoreLog.hs index 8175aca73..dc65e4a22 100644 --- a/src/Simplex/FileTransfer/Server/StoreLog.hs +++ b/src/Simplex/FileTransfer/Server/StoreLog.hs @@ -32,6 +32,7 @@ import Data.Map.Strict (Map) import qualified Data.Map.Strict as M import Simplex.FileTransfer.Protocol (FileInfo (..)) import Simplex.FileTransfer.Server.Store +import Simplex.FileTransfer.Server.Store.STM (STMFileStore (..)) import Simplex.Messaging.Encoding.String import Simplex.Messaging.Protocol (BlockingInfo, RcvPublicAuthKey, RecipientId, SenderId) import Simplex.Messaging.Server.QueueStore (ServerEntityStatus (..)) @@ -87,10 +88,10 @@ logBlockFile s fId = logFileStoreRecord s . BlockFile fId logAckFile :: StoreLog 'WriteMode -> RecipientId -> IO () logAckFile s = logFileStoreRecord s . AckFile -readWriteFileStore :: FilePath -> FileStore -> IO (StoreLog 'WriteMode) +readWriteFileStore :: FilePath -> STMFileStore -> IO (StoreLog 'WriteMode) readWriteFileStore = readWriteStoreLog readFileStore writeFileStore -readFileStore :: FilePath -> FileStore -> IO () +readFileStore :: FilePath -> STMFileStore -> IO () readFileStore f st = mapM_ (addFileLogRecord . LB.toStrict) . LB.lines =<< LB.readFile f where addFileLogRecord s = case strDecode s of @@ -108,8 +109,8 @@ readFileStore f st = mapM_ (addFileLogRecord . LB.toStrict) . LB.lines =<< LB.re AckFile rId -> ackFile st rId addRecipients sId rcps = mapM_ (ExceptT . addRecipient st sId) rcps -writeFileStore :: StoreLog 'WriteMode -> FileStore -> IO () -writeFileStore s FileStore {files, recipients} = do +writeFileStore :: StoreLog 'WriteMode -> STMFileStore -> IO () +writeFileStore s STMFileStore {files, recipients} = do allRcps <- readTVarIO recipients readTVarIO files >>= mapM_ (logFile allRcps) where