mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-04-26 13:07:25 +00:00
refactor: add getUsedStorage, getFileCount, expiredFiles store functions
This commit is contained in:
@@ -31,7 +31,6 @@ import qualified Data.ByteString.Char8 as B
|
||||
import Data.Int (Int64)
|
||||
import Data.List.NonEmpty (NonEmpty)
|
||||
import qualified Data.List.NonEmpty as L
|
||||
import qualified Data.Map.Strict as M
|
||||
import Data.Maybe (fromMaybe, isJust)
|
||||
import qualified Data.Text as T
|
||||
import qualified Data.Text.IO as T
|
||||
@@ -642,26 +641,25 @@ expireServerFiles itemDelay expCfg = do
|
||||
us <- asks usedStorage
|
||||
usedStart <- readTVarIO us
|
||||
old <- liftIO $ expireBeforeEpoch expCfg
|
||||
files' <- readTVarIO (files st)
|
||||
logNote $ "Expiration check: " <> tshow (M.size files') <> " files"
|
||||
forM_ (M.keys files') $ \sId -> do
|
||||
mapM_ threadDelay itemDelay
|
||||
atomically (expiredFilePath st sId old)
|
||||
>>= mapM_ (maybeRemove $ delete st sId)
|
||||
filesCount <- liftIO $ getFileCount st
|
||||
logNote $ "Expiration check: " <> tshow filesCount <> " files"
|
||||
expireLoop st us old
|
||||
usedEnd <- readTVarIO us
|
||||
logNote $ "Used " <> mbs usedStart <> " -> " <> mbs usedEnd <> ", " <> mbs (usedStart - usedEnd) <> " reclaimed."
|
||||
where
|
||||
mbs bs = tshow (bs `div` 1048576) <> "mb"
|
||||
maybeRemove del = maybe del (remove del)
|
||||
remove del filePath =
|
||||
ifM
|
||||
(doesFileExist filePath)
|
||||
((removeFile filePath >> del) `catch` \(e :: SomeException) -> logError $ "failed to remove expired file " <> tshow filePath <> ": " <> tshow e)
|
||||
del
|
||||
delete st sId = do
|
||||
withFileLog (`logDeleteFile` sId)
|
||||
void . atomically $ deleteFile st sId -- will not update usedStorage if sId isn't in store
|
||||
incFileStat filesExpired
|
||||
expireLoop st us old = do
|
||||
expired <- liftIO $ expiredFiles st old 10000
|
||||
forM_ expired $ \(sId, filePath_, fileSize) -> do
|
||||
mapM_ threadDelay itemDelay
|
||||
forM_ filePath_ $ \fp ->
|
||||
whenM (doesFileExist fp) $
|
||||
removeFile fp `catch` \(e :: SomeException) -> logError $ "failed to remove expired file " <> tshow fp <> ": " <> tshow e
|
||||
withFileLog (`logDeleteFile` sId)
|
||||
void . atomically $ deleteFile st sId
|
||||
atomically $ modifyTVar' us $ subtract (fromIntegral fileSize)
|
||||
incFileStat filesExpired
|
||||
unless (null expired) $ expireLoop st us old
|
||||
|
||||
randomId :: Int -> M ByteString
|
||||
randomId n = atomically . C.randomBytes n =<< asks random
|
||||
@@ -695,8 +693,8 @@ restoreServerStats = asks (serverStatsBackupFile . config) >>= mapM_ restoreStat
|
||||
liftIO (strDecode <$> B.readFile f) >>= \case
|
||||
Right d@FileServerStatsData {_filesCount = statsFilesCount, _filesSize = statsFilesSize} -> do
|
||||
s <- asks serverStats
|
||||
FileStore {files} <- asks store
|
||||
_filesCount <- M.size <$> readTVarIO files
|
||||
st <- asks store
|
||||
_filesCount <- liftIO $ getFileCount st
|
||||
_filesSize <- readTVarIO =<< asks usedStorage
|
||||
liftIO $ setFileServerStats s d {_filesCount, _filesSize}
|
||||
renameFile f $ f <> ".bak"
|
||||
|
||||
@@ -15,7 +15,6 @@ module Simplex.FileTransfer.Server.Env
|
||||
defFileExpirationHours,
|
||||
defaultFileExpiration,
|
||||
newXFTPServerEnv,
|
||||
countUsedStorage,
|
||||
) where
|
||||
|
||||
import Control.Logger.Simple
|
||||
@@ -23,7 +22,6 @@ import Control.Monad
|
||||
import Crypto.Random
|
||||
import Data.Int (Int64)
|
||||
import Data.List.NonEmpty (NonEmpty)
|
||||
import qualified Data.Map.Strict as M
|
||||
import Data.Time.Clock (getCurrentTime)
|
||||
import Data.Word (Word32)
|
||||
import Data.X509.Validation (Fingerprint (..))
|
||||
@@ -115,7 +113,7 @@ newXFTPServerEnv config@XFTPServerConfig {storeLogFile, fileSizeQuota, xftpCrede
|
||||
random <- C.newRandom
|
||||
store <- newFileStore
|
||||
storeLog <- mapM (`readWriteFileStore` store) storeLogFile
|
||||
used <- countUsedStorage <$> readTVarIO (files store)
|
||||
used <- getUsedStorage store
|
||||
usedStorage <- newTVarIO used
|
||||
forM_ fileSizeQuota $ \quota -> do
|
||||
logNote $ "Total / available storage: " <> tshow quota <> " / " <> tshow (quota - used)
|
||||
@@ -126,9 +124,6 @@ newXFTPServerEnv config@XFTPServerConfig {storeLogFile, fileSizeQuota, xftpCrede
|
||||
serverStats <- newFileServerStats =<< getCurrentTime
|
||||
pure XFTPEnv {config, store, usedStorage, storeLog, random, tlsServerCreds, httpServerCreds, serverIdentity = C.KeyHash fp, serverStats}
|
||||
|
||||
countUsedStorage :: M.Map k FileRec -> Int64
|
||||
countUsedStorage = M.foldl' (\acc FileRec {fileInfo = FileInfo {size}} -> acc + fromIntegral size) 0
|
||||
|
||||
data XFTPRequest
|
||||
= XFTPReqNew FileInfo (NonEmpty RcvPublicAuthKey) (Maybe BasicAuth)
|
||||
| XFTPReqCmd XFTPFileId FileRec FileCmd
|
||||
|
||||
@@ -17,18 +17,24 @@ module Simplex.FileTransfer.Server.Store
|
||||
deleteFile,
|
||||
blockFile,
|
||||
deleteRecipient,
|
||||
expiredFilePath,
|
||||
getFile,
|
||||
ackFile,
|
||||
expiredFiles,
|
||||
getUsedStorage,
|
||||
getFileCount,
|
||||
fileTimePrecision,
|
||||
)
|
||||
where
|
||||
|
||||
import Control.Concurrent.STM
|
||||
import Control.Monad (forM)
|
||||
import qualified Data.Attoparsec.ByteString.Char8 as A
|
||||
import Data.Int (Int64)
|
||||
import qualified Data.Map.Strict as M
|
||||
import Data.Maybe (catMaybes)
|
||||
import Data.Set (Set)
|
||||
import qualified Data.Set as S
|
||||
import Data.Word (Word32)
|
||||
import Simplex.FileTransfer.Protocol (FileInfo (..), SFileParty (..), XFTPFileId)
|
||||
import Simplex.FileTransfer.Transport (XFTPErrorType (..))
|
||||
import qualified Simplex.Messaging.Crypto as C
|
||||
@@ -38,7 +44,7 @@ import Simplex.Messaging.Server.QueueStore (ServerEntityStatus (..))
|
||||
import Simplex.Messaging.SystemTime
|
||||
import Simplex.Messaging.TMap (TMap)
|
||||
import qualified Simplex.Messaging.TMap as TM
|
||||
import Simplex.Messaging.Util (ifM, ($>>=))
|
||||
import Simplex.Messaging.Util (ifM)
|
||||
|
||||
data FileStore = FileStore
|
||||
{ files :: TMap SenderId FileRec,
|
||||
@@ -133,14 +139,6 @@ getFile st party fId = case party of
|
||||
Just (sId, rKey) -> withFile st sId $ pure . Right . (,rKey)
|
||||
_ -> pure $ Left AUTH
|
||||
|
||||
expiredFilePath :: FileStore -> XFTPFileId -> Int64 -> STM (Maybe (Maybe FilePath))
|
||||
expiredFilePath FileStore {files} sId old =
|
||||
TM.lookup sId files
|
||||
$>>= \FileRec {filePath, createdAt = RoundedSystemTime createdAt} ->
|
||||
if createdAt + fileTimePrecision < old
|
||||
then Just <$> readTVar filePath
|
||||
else pure Nothing
|
||||
|
||||
ackFile :: FileStore -> RecipientId -> STM (Either XFTPErrorType ())
|
||||
ackFile st@FileStore {recipients} recipientId = do
|
||||
TM.lookupDelete recipientId recipients >>= \case
|
||||
@@ -150,6 +148,23 @@ ackFile st@FileStore {recipients} recipientId = do
|
||||
pure $ Right ()
|
||||
_ -> pure $ Left AUTH
|
||||
|
||||
expiredFiles :: FileStore -> Int64 -> Int -> IO [(SenderId, Maybe FilePath, Word32)]
|
||||
expiredFiles FileStore {files} old _limit = do
|
||||
fs <- readTVarIO files
|
||||
fmap catMaybes . forM (M.toList fs) $ \(sId, FileRec {fileInfo = FileInfo {size}, filePath, createdAt = RoundedSystemTime createdAt}) ->
|
||||
if createdAt + fileTimePrecision < old
|
||||
then do
|
||||
path <- readTVarIO filePath
|
||||
pure $ Just (sId, path, size)
|
||||
else pure Nothing
|
||||
|
||||
getUsedStorage :: FileStore -> IO Int64
|
||||
getUsedStorage FileStore {files} =
|
||||
M.foldl' (\acc FileRec {fileInfo = FileInfo {size}} -> acc + fromIntegral size) 0 <$> readTVarIO files
|
||||
|
||||
getFileCount :: FileStore -> IO Int
|
||||
getFileCount FileStore {files} = M.size <$> readTVarIO files
|
||||
|
||||
withFile :: FileStore -> SenderId -> (FileRec -> STM (Either XFTPErrorType a)) -> STM (Either XFTPErrorType a)
|
||||
withFile FileStore {files} sId a =
|
||||
TM.lookup sId files >>= \case
|
||||
|
||||
Reference in New Issue
Block a user