mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-03-30 16:26:02 +00:00
xftp-server: fix receiveServerFile (#1048)
* xftp-server: fix receiveServerFile * refactor --------- Co-authored-by: Evgeny Poberezkin <evgeny@poberezkin.com>
This commit is contained in:
committed by
GitHub
parent
097cec1c35
commit
db3bddecca
@@ -21,12 +21,12 @@ import qualified Data.ByteString.Base64.URL as B64
|
||||
import Data.ByteString.Builder (byteString)
|
||||
import Data.ByteString.Char8 (ByteString)
|
||||
import qualified Data.ByteString.Char8 as B
|
||||
import Data.Functor (($>))
|
||||
import Data.Int (Int64)
|
||||
import Data.List (intercalate)
|
||||
import Data.List.NonEmpty (NonEmpty)
|
||||
import qualified Data.List.NonEmpty as L
|
||||
import qualified Data.Map.Strict as M
|
||||
import Data.Maybe (fromMaybe, isJust)
|
||||
import qualified Data.Text as T
|
||||
import Data.Time.Clock (UTCTime (..), diffTimeToPicoseconds, getCurrentTime)
|
||||
import Data.Time.Clock.System (SystemTime (..), getSystemTime)
|
||||
@@ -346,35 +346,38 @@ processXFTPRequest HTTP2Body {bodyPart} = \case
|
||||
pure $ FRRcvIds rIds
|
||||
pure $ either FRErr id r
|
||||
receiveServerFile :: FileRec -> M FileResponse
|
||||
receiveServerFile fr@FileRec {senderId, fileInfo} = case bodyPart of
|
||||
-- TODO do not allow repeated file upload
|
||||
receiveServerFile FileRec {senderId, fileInfo = FileInfo {size, digest}, filePath} = case bodyPart of
|
||||
Nothing -> pure $ FRErr SIZE
|
||||
Just getBody -> do
|
||||
-- TODO validate body size before downloading, once it's populated
|
||||
path <- asks $ filesPath . config
|
||||
let fPath = path </> B.unpack (B64.encode senderId)
|
||||
FileInfo {size, digest} = fileInfo
|
||||
withFileLog $ \sl -> logPutFile sl senderId fPath
|
||||
st <- asks store
|
||||
quota_ <- asks $ fileSizeQuota . config
|
||||
-- TODO timeout file upload, remove partially uploaded files
|
||||
stats <- asks serverStats
|
||||
liftIO $
|
||||
runExceptT (receiveFile getBody (XFTPRcvChunkSpec fPath size digest)) >>= \case
|
||||
Right () -> do
|
||||
used <- readTVarIO $ usedStorage st
|
||||
if maybe False (used + fromIntegral size >) quota_
|
||||
then remove fPath $> FRErr QUOTA
|
||||
else do
|
||||
atomically (setFilePath' st fr fPath)
|
||||
atomically $ modifyTVar' (filesUploaded stats) (+ 1)
|
||||
atomically $ modifyTVar' (filesCount stats) (+ 1)
|
||||
atomically $ modifyTVar' (filesSize stats) (+ fromIntegral size)
|
||||
pure FROk
|
||||
Left e -> remove fPath $> FRErr e
|
||||
-- TODO validate body size from request before downloading, once it's populated
|
||||
Just getBody -> checkDuplicate $ ifM reserve receive (pure $ FRErr QUOTA)
|
||||
where
|
||||
remove fPath = whenM (doesFileExist fPath) (removeFile fPath) `catch` logFileError
|
||||
|
||||
reserve = do
|
||||
us <- asks $ usedStorage . store
|
||||
quota <- asks $ fromMaybe maxBound . fileSizeQuota . config
|
||||
atomically . stateTVar us $
|
||||
\used -> let used' = used + fromIntegral size in if used' <= quota then (True, used') else (False, used)
|
||||
receive = do
|
||||
path <- asks $ filesPath . config
|
||||
let fPath = path </> B.unpack (B64.encode senderId)
|
||||
receiveChunk (XFTPRcvChunkSpec fPath size digest) >>= \case
|
||||
Right () -> do
|
||||
stats <- asks serverStats
|
||||
withFileLog $ \sl -> logPutFile sl senderId fPath
|
||||
atomically $ writeTVar filePath (Just fPath)
|
||||
atomically $ modifyTVar' (filesUploaded stats) (+ 1)
|
||||
atomically $ modifyTVar' (filesCount stats) (+ 1)
|
||||
atomically $ modifyTVar' (filesSize stats) (+ fromIntegral size)
|
||||
pure FROk
|
||||
Left e -> do
|
||||
us <- asks $ usedStorage . store
|
||||
atomically . modifyTVar' us $ subtract (fromIntegral size)
|
||||
liftIO $ whenM (doesFileExist fPath) (removeFile fPath) `catch` logFileError
|
||||
pure $ FRErr e
|
||||
receiveChunk spec = do
|
||||
t <- asks $ fileTimeout . config
|
||||
liftIO $ fromMaybe (Left TIMEOUT) <$> timeout t (runExceptT (receiveFile getBody spec) `catchAll_` pure (Left FILE_IO))
|
||||
where
|
||||
checkDuplicate = ifM (isJust <$> readTVarIO filePath) (pure $ FRErr DUPLICATE_)
|
||||
sendServerFile :: FileRec -> RcvPublicDhKey -> M (FileResponse, Maybe ServerFile)
|
||||
sendServerFile FileRec {senderId, filePath, fileInfo = FileInfo {size}} rDhKey = do
|
||||
readTVarIO filePath >>= \case
|
||||
|
||||
@@ -47,6 +47,8 @@ data XFTPServerConfig = XFTPServerConfig
|
||||
newFileBasicAuth :: Maybe BasicAuth,
|
||||
-- | time after which the files can be removed and check interval, seconds
|
||||
fileExpiration :: Maybe ExpirationConfig,
|
||||
-- | timeout to receive file
|
||||
fileTimeout :: Int,
|
||||
-- | time after which inactive clients can be disconnected and check interval, seconds
|
||||
inactiveClientExpiration :: Maybe ExpirationConfig,
|
||||
-- CA certificate private key is not needed for initialization
|
||||
|
||||
@@ -160,6 +160,7 @@ xftpServerCLI cfgPath logPath = do
|
||||
defaultFileExpiration
|
||||
{ ttl = 3600 * readIniDefault defFileExpirationHours "STORE_LOG" "expire_files_hours" ini
|
||||
},
|
||||
fileTimeout = 10 * 60 * 1000000, -- 10 mins to send 4mb chunk
|
||||
inactiveClientExpiration =
|
||||
settingIsOn "INACTIVE_CLIENTS" "disconnect" ini
|
||||
$> ExpirationConfig
|
||||
|
||||
@@ -11,7 +11,6 @@ module Simplex.FileTransfer.Server.Store
|
||||
newFileStore,
|
||||
addFile,
|
||||
setFilePath,
|
||||
setFilePath',
|
||||
addRecipient,
|
||||
deleteFile,
|
||||
deleteRecipient,
|
||||
@@ -79,12 +78,10 @@ newFileRec senderId fileInfo createdAt = do
|
||||
|
||||
setFilePath :: FileStore -> SenderId -> FilePath -> STM (Either XFTPErrorType ())
|
||||
setFilePath st sId fPath =
|
||||
withFile st sId $ \fr -> setFilePath' st fr fPath $> Right ()
|
||||
|
||||
setFilePath' :: FileStore -> FileRec -> FilePath -> STM ()
|
||||
setFilePath' st FileRec {fileInfo, filePath} fPath = do
|
||||
writeTVar filePath (Just fPath)
|
||||
modifyTVar' (usedStorage st) (+ fromIntegral (size fileInfo))
|
||||
withFile st sId $ \FileRec {fileInfo, filePath} -> do
|
||||
writeTVar filePath (Just fPath)
|
||||
modifyTVar' (usedStorage st) (+ fromIntegral (size fileInfo))
|
||||
pure $ Right ()
|
||||
|
||||
addRecipient :: FileStore -> SenderId -> FileRecipient -> STM (Either XFTPErrorType ())
|
||||
addRecipient st@FileStore {recipients} senderId (FileRecipient rId rKey) =
|
||||
|
||||
@@ -157,6 +157,8 @@ data XFTPErrorType
|
||||
HAS_FILE
|
||||
| -- | file IO error
|
||||
FILE_IO
|
||||
| -- | file sending timeout
|
||||
TIMEOUT
|
||||
| -- | bad redirect data
|
||||
REDIRECT {redirectError :: String}
|
||||
| -- | internal server error
|
||||
@@ -188,6 +190,7 @@ instance Encoding XFTPErrorType where
|
||||
NO_FILE -> "NO_FILE"
|
||||
HAS_FILE -> "HAS_FILE"
|
||||
FILE_IO -> "FILE_IO"
|
||||
TIMEOUT -> "TIMEOUT"
|
||||
REDIRECT err -> "REDIRECT " <> smpEncode err
|
||||
INTERNAL -> "INTERNAL"
|
||||
DUPLICATE_ -> "DUPLICATE_"
|
||||
@@ -205,6 +208,7 @@ instance Encoding XFTPErrorType where
|
||||
"NO_FILE" -> pure NO_FILE
|
||||
"HAS_FILE" -> pure HAS_FILE
|
||||
"FILE_IO" -> pure FILE_IO
|
||||
"TIMEOUT" -> pure TIMEOUT
|
||||
"REDIRECT" -> REDIRECT <$> _smpP
|
||||
"INTERNAL" -> pure INTERNAL
|
||||
"DUPLICATE_" -> pure DUPLICATE_
|
||||
|
||||
@@ -106,6 +106,7 @@ testXFTPServerConfig =
|
||||
allowNewFiles = True,
|
||||
newFileBasicAuth = Nothing,
|
||||
fileExpiration = Just defaultFileExpiration,
|
||||
fileTimeout = 10000000,
|
||||
inactiveClientExpiration = Just defaultInactiveClientExpiration,
|
||||
caCertificateFile = "tests/fixtures/ca.crt",
|
||||
privateKeyFile = "tests/fixtures/server.key",
|
||||
|
||||
Reference in New Issue
Block a user