refactor: make XFTPEnv and server polymorphic over FileStoreClass

This commit is contained in:
shum
2026-04-01 13:34:35 +00:00
parent 6f4bf647ed
commit ff254b451b
4 changed files with 65 additions and 56 deletions

View File

@@ -51,7 +51,6 @@ import Simplex.FileTransfer.Server.Env
import Simplex.FileTransfer.Server.Prometheus
import Simplex.FileTransfer.Server.Stats
import Simplex.FileTransfer.Server.Store
import Simplex.FileTransfer.Server.Store.STM (STMFileStore)
import Simplex.FileTransfer.Server.StoreLog
import Simplex.FileTransfer.Transport
import qualified Simplex.Messaging.Crypto as C
@@ -88,7 +87,7 @@ import UnliftIO.Concurrent (threadDelay)
import UnliftIO.Directory (canonicalizePath, doesFileExist, removeFile, renameFile)
import qualified UnliftIO.Exception as E
type M a = ReaderT XFTPEnv IO a
type M s a = ReaderT (XFTPEnv s) IO a
data XFTPTransportRequest = XFTPTransportRequest
{ thParams :: THandleParamsXFTP 'TServer,
@@ -112,19 +111,19 @@ corsPreflightHeaders =
("Access-Control-Max-Age", "86400")
]
runXFTPServer :: XFTPServerConfig -> IO ()
runXFTPServer cfg = do
runXFTPServer :: FileStoreClass s => XFTPStoreConfig s -> XFTPServerConfig -> IO ()
runXFTPServer storeCfg cfg = do
started <- newEmptyTMVarIO
runXFTPServerBlocking started cfg
runXFTPServerBlocking started storeCfg cfg
runXFTPServerBlocking :: TMVar Bool -> XFTPServerConfig -> IO ()
runXFTPServerBlocking started cfg = newXFTPServerEnv cfg >>= runReaderT (xftpServer cfg started)
runXFTPServerBlocking :: FileStoreClass s => TMVar Bool -> XFTPStoreConfig s -> XFTPServerConfig -> IO ()
runXFTPServerBlocking started storeCfg cfg = newXFTPServerEnv storeCfg cfg >>= runReaderT (xftpServer cfg started)
data Handshake
= HandshakeSent C.PrivateKeyX25519
| HandshakeAccepted (THandleParams XFTPVersion 'TServer)
xftpServer :: XFTPServerConfig -> TMVar Bool -> M ()
xftpServer :: forall s. FileStoreClass s => XFTPServerConfig -> TMVar Bool -> M s ()
xftpServer cfg@XFTPServerConfig {xftpPort, transportConfig, inactiveClientExpiration, fileExpiration, xftpServerVRange} started = do
mapM_ (expireServerFiles Nothing) fileExpiration
restoreServerStats
@@ -137,7 +136,7 @@ xftpServer cfg@XFTPServerConfig {xftpPort, transportConfig, inactiveClientExpira
)
`finally` stopServer
where
runServer :: M ()
runServer :: M s ()
runServer = do
srvCreds@(chain, pk) <- asks tlsServerCreds
httpCreds_ <- asks httpServerCreds
@@ -168,7 +167,7 @@ xftpServer cfg@XFTPServerConfig {xftpPort, transportConfig, inactiveClientExpira
Nothing -> pure ()
Just thParams -> processRequest req0 {thParams}
| otherwise -> liftIO . sendResponse $ H.responseNoBody N.ok200 (corsHeaders addCORS')
xftpServerHandshakeV1 :: X.CertificateChain -> C.APrivateSignKey -> TMap SessionId Handshake -> XFTPTransportRequest -> M (Maybe (THandleParams XFTPVersion 'TServer))
xftpServerHandshakeV1 :: X.CertificateChain -> C.APrivateSignKey -> TMap SessionId Handshake -> XFTPTransportRequest -> M s (Maybe (THandleParams XFTPVersion 'TServer))
xftpServerHandshakeV1 chain serverSignKey sessions XFTPTransportRequest {thParams = thParams0@THandleParams {sessionId}, request, reqBody = HTTP2Body {bodyHead}, sendResponse, sniUsed, addCORS} = do
s <- atomically $ TM.lookup sessionId sessions
r <- runExceptT $ case s of
@@ -227,39 +226,41 @@ xftpServer cfg@XFTPServerConfig {xftpPort, transportConfig, inactiveClientExpira
liftIO . sendResponse $ H.responseNoBody N.ok200 (corsHeaders addCORS)
pure Nothing
Nothing -> throwE HANDSHAKE
sendError :: XFTPErrorType -> M (Maybe (THandleParams XFTPVersion 'TServer))
sendError :: XFTPErrorType -> M s (Maybe (THandleParams XFTPVersion 'TServer))
sendError err = do
runExceptT (encodeXftp err) >>= \case
Right bs -> liftIO . sendResponse $ H.responseBuilder N.ok200 (corsHeaders addCORS) bs
Left _ -> logError $ "Error encoding handshake error: " <> tshow err
pure Nothing
encodeXftp :: Encoding a => a -> ExceptT XFTPErrorType (ReaderT XFTPEnv IO) Builder
encodeXftp :: Encoding a => a -> ExceptT XFTPErrorType (ReaderT (XFTPEnv s) IO) Builder
encodeXftp a = byteString <$> liftHS (C.pad (smpEncode a) xftpBlockSize)
liftHS = liftEitherWith (const HANDSHAKE)
stopServer :: M ()
stopServer :: M s ()
stopServer = do
withFileLog closeStoreLog
st <- asks store
liftIO $ closeFileStore st
saveServerStats
logNote "Server stopped"
expireFilesThread_ :: XFTPServerConfig -> [M ()]
expireFilesThread_ :: XFTPServerConfig -> [M s ()]
expireFilesThread_ XFTPServerConfig {fileExpiration = Just fileExp} = [expireFiles fileExp]
expireFilesThread_ _ = []
expireFiles :: ExpirationConfig -> M ()
expireFiles :: ExpirationConfig -> M s ()
expireFiles expCfg = do
let interval = checkInterval expCfg * 1000000
forever $ do
liftIO $ threadDelay' interval
expireServerFiles (Just 100000) expCfg
serverStatsThread_ :: XFTPServerConfig -> [M ()]
serverStatsThread_ :: XFTPServerConfig -> [M s ()]
serverStatsThread_ XFTPServerConfig {logStatsInterval = Just interval, logStatsStartTime, serverStatsLogFile} =
[logServerStats logStatsStartTime interval serverStatsLogFile]
serverStatsThread_ _ = []
logServerStats :: Int64 -> Int64 -> FilePath -> M ()
logServerStats :: Int64 -> Int64 -> FilePath -> M s ()
logServerStats startAt logInterval statsFilePath = do
initialDelay <- (startAt -) . fromIntegral . (`div` 1000000_000000) . diffTimeToPicoseconds . utctDayTime <$> liftIO getCurrentTime
liftIO $ putStrLn $ "server stats log enabled: " <> statsFilePath
@@ -300,12 +301,12 @@ xftpServer cfg@XFTPServerConfig {xftpPort, transportConfig, inactiveClientExpira
]
liftIO $ threadDelay' interval
prometheusMetricsThread_ :: XFTPServerConfig -> [M ()]
prometheusMetricsThread_ :: XFTPServerConfig -> [M s ()]
prometheusMetricsThread_ XFTPServerConfig {prometheusInterval = Just interval, prometheusMetricsFile} =
[savePrometheusMetrics interval prometheusMetricsFile]
prometheusMetricsThread_ _ = []
savePrometheusMetrics :: Int -> FilePath -> M ()
savePrometheusMetrics :: Int -> FilePath -> M s ()
savePrometheusMetrics saveInterval metricsFile = do
labelMyThread "savePrometheusMetrics"
liftIO $ putStrLn $ "Prometheus metrics saved every " <> show saveInterval <> " seconds to " <> metricsFile
@@ -324,11 +325,11 @@ xftpServer cfg@XFTPServerConfig {xftpPort, transportConfig, inactiveClientExpira
let fd = periodStatDataCounts $ _filesDownloaded d
pure FileServerMetrics {statsData = d, filesDownloadedPeriods = fd, rtsOptions}
controlPortThread_ :: XFTPServerConfig -> [M ()]
controlPortThread_ :: XFTPServerConfig -> [M s ()]
controlPortThread_ XFTPServerConfig {controlPort = Just port} = [runCPServer port]
controlPortThread_ _ = []
runCPServer :: ServiceName -> M ()
runCPServer :: ServiceName -> M s ()
runCPServer port = do
cpStarted <- newEmptyTMVarIO
u <- askUnliftIO
@@ -336,7 +337,7 @@ xftpServer cfg@XFTPServerConfig {xftpPort, transportConfig, inactiveClientExpira
labelMyThread "control port server"
runLocalTCPServer cpStarted port $ runCPClient u
where
runCPClient :: UnliftIO (ReaderT XFTPEnv IO) -> Socket -> IO ()
runCPClient :: UnliftIO (ReaderT (XFTPEnv s) IO) -> Socket -> IO ()
runCPClient u sock = do
labelMyThread "control port client"
h <- socketToHandle sock ReadWriteMode
@@ -395,7 +396,7 @@ data ServerFile = ServerFile
sbState :: LC.SbState
}
processRequest :: XFTPTransportRequest -> M ()
processRequest :: FileStoreClass s => XFTPTransportRequest -> M s ()
processRequest XFTPTransportRequest {thParams, reqBody = body@HTTP2Body {bodyHead}, sendResponse, addCORS}
| B.length bodyHead /= xftpBlockSize = sendXFTPResponse ("", NoEntity, FRErr BLOCK) Nothing
| otherwise =
@@ -430,7 +431,7 @@ processRequest XFTPTransportRequest {thParams, reqBody = body@HTTP2Body {bodyHea
done
#ifdef slow_servers
randomDelay :: M ()
randomDelay :: M s ()
randomDelay = do
d <- asks $ responseDelay . config
when (d > 0) $ do
@@ -440,14 +441,14 @@ randomDelay = do
data VerificationResult = VRVerified XFTPRequest | VRFailed XFTPErrorType
verifyXFTPTransmission :: Maybe (THandleAuth 'TServer) -> SignedTransmission FileCmd -> M VerificationResult
verifyXFTPTransmission :: forall s. FileStoreClass s => Maybe (THandleAuth 'TServer) -> SignedTransmission FileCmd -> M s VerificationResult
verifyXFTPTransmission thAuth (tAuth, authorized, (corrId, fId, cmd)) =
case cmd of
FileCmd SFSender (FNEW file rcps auth') -> pure $ XFTPReqNew file rcps auth' `verifyWith` sndKey file
FileCmd SFRecipient PING -> pure $ VRVerified XFTPReqPing
FileCmd party _ -> verifyCmd party
where
verifyCmd :: SFileParty p -> M VerificationResult
verifyCmd :: SFileParty p -> M s VerificationResult
verifyCmd party = do
st <- asks store
liftIO (getFile st party fId) >>= \case
@@ -463,7 +464,7 @@ verifyXFTPTransmission thAuth (tAuth, authorized, (corrId, fId, cmd)) =
-- TODO verify with DH authorization
req `verifyWith` k = if verifyCmdAuthorization thAuth tAuth authorized corrId k then VRVerified req else VRFailed AUTH
processXFTPRequest :: HTTP2Body -> XFTPRequest -> M (FileResponse, Maybe ServerFile)
processXFTPRequest :: forall s. FileStoreClass s => HTTP2Body -> XFTPRequest -> M s (FileResponse, Maybe ServerFile)
processXFTPRequest HTTP2Body {bodyPart} = \case
XFTPReqNew file rks auth -> noFile =<< ifM allowNew (createFile file rks) (pure $ FRErr AUTH)
where
@@ -482,7 +483,7 @@ processXFTPRequest HTTP2Body {bodyPart} = \case
XFTPReqPing -> noFile FRPong
where
noFile resp = pure (resp, Nothing)
createFile :: FileInfo -> NonEmpty RcvPublicAuthKey -> M FileResponse
createFile :: FileInfo -> NonEmpty RcvPublicAuthKey -> M s FileResponse
createFile file rks = do
st <- asks store
r <- runExceptT $ do
@@ -501,25 +502,25 @@ processXFTPRequest HTTP2Body {bodyPart} = \case
let rIds = L.map (\(FileRecipient rId _) -> rId) rcps
pure $ FRSndIds sId rIds
pure $ either FRErr id r
addFileRetry :: STMFileStore -> FileInfo -> Int -> RoundedFileTime -> M (Either XFTPErrorType XFTPFileId)
addFileRetry :: s -> FileInfo -> Int -> RoundedFileTime -> M s (Either XFTPErrorType XFTPFileId)
addFileRetry st file n ts =
retryAdd n $ \sId -> runExceptT $ do
ExceptT $ addFile st sId file ts EntityActive
pure sId
addRecipientRetry :: STMFileStore -> Int -> XFTPFileId -> RcvPublicAuthKey -> M (Either XFTPErrorType FileRecipient)
addRecipientRetry :: s -> Int -> XFTPFileId -> RcvPublicAuthKey -> M s (Either XFTPErrorType FileRecipient)
addRecipientRetry st n sId rpk =
retryAdd n $ \rId -> runExceptT $ do
let rcp = FileRecipient rId rpk
ExceptT $ addRecipient st sId rcp
pure rcp
retryAdd :: Int -> (XFTPFileId -> IO (Either XFTPErrorType a)) -> M (Either XFTPErrorType a)
retryAdd :: Int -> (XFTPFileId -> IO (Either XFTPErrorType a)) -> M s (Either XFTPErrorType a)
retryAdd 0 _ = pure $ Left INTERNAL
retryAdd n add = do
fId <- getFileId
liftIO (add fId) >>= \case
Left DUPLICATE_ -> retryAdd (n - 1) add
r -> pure r
addRecipients :: XFTPFileId -> NonEmpty RcvPublicAuthKey -> M FileResponse
addRecipients :: XFTPFileId -> NonEmpty RcvPublicAuthKey -> M s FileResponse
addRecipients sId rks = do
st <- asks store
r <- runExceptT $ do
@@ -530,7 +531,7 @@ processXFTPRequest HTTP2Body {bodyPart} = \case
let rIds = L.map (\(FileRecipient rId _) -> rId) rcps
pure $ FRRcvIds rIds
pure $ either FRErr id r
receiveServerFile :: FileRec -> M FileResponse
receiveServerFile :: FileRec -> M s FileResponse
receiveServerFile FileRec {senderId, fileInfo = FileInfo {size, digest}, filePath} = case bodyPart of
Nothing -> pure $ FRErr SIZE
-- TODO validate body size from request before downloading, once it's populated
@@ -573,7 +574,7 @@ processXFTPRequest HTTP2Body {bodyPart} = \case
receiveChunk spec = do
t <- asks $ fileTimeout . config
liftIO $ fromMaybe (Left TIMEOUT) <$> timeout t (runExceptT $ receiveFile getBody spec)
sendServerFile :: FileRec -> RcvPublicDhKey -> M (FileResponse, Maybe ServerFile)
sendServerFile :: FileRec -> RcvPublicDhKey -> M s (FileResponse, Maybe ServerFile)
sendServerFile FileRec {senderId, filePath, fileInfo = FileInfo {size}} rDhKey = do
readTVarIO filePath >>= \case
Just path -> ifM (doesFileExist path) sendFile (pure (FRErr AUTH, Nothing))
@@ -592,13 +593,13 @@ processXFTPRequest HTTP2Body {bodyPart} = \case
_ -> pure (FRErr INTERNAL, Nothing)
_ -> pure (FRErr NO_FILE, Nothing)
deleteServerFile :: FileRec -> M FileResponse
deleteServerFile :: FileRec -> M s FileResponse
deleteServerFile fr = either FRErr (\() -> FROk) <$> deleteServerFile_ fr
logFileError :: SomeException -> IO ()
logFileError e = logError $ "Error deleting file: " <> tshow e
ackFileReception :: RecipientId -> FileRec -> M FileResponse
ackFileReception :: RecipientId -> FileRec -> M s FileResponse
ackFileReception rId fr = do
withFileLog (`logAckFile` rId)
st <- asks store
@@ -606,18 +607,18 @@ processXFTPRequest HTTP2Body {bodyPart} = \case
incFileStat fileDownloadAcks
pure FROk
deleteServerFile_ :: FileRec -> M (Either XFTPErrorType ())
deleteServerFile_ :: FileStoreClass s => FileRec -> M s (Either XFTPErrorType ())
deleteServerFile_ fr@FileRec {senderId} = do
withFileLog (`logDeleteFile` senderId)
deleteOrBlockServerFile_ fr filesDeleted (`deleteFile` senderId)
-- this also deletes the file from storage, but doesn't include it in delete statistics
blockServerFile :: FileRec -> BlockingInfo -> M (Either XFTPErrorType ())
blockServerFile :: FileStoreClass s => FileRec -> BlockingInfo -> M s (Either XFTPErrorType ())
blockServerFile fr@FileRec {senderId} info = do
withFileLog $ \sl -> logBlockFile sl senderId info
deleteOrBlockServerFile_ fr filesBlocked $ \st -> blockFile st senderId info True
deleteOrBlockServerFile_ :: FileRec -> (FileServerStats -> IORef Int) -> (STMFileStore -> IO (Either XFTPErrorType ())) -> M (Either XFTPErrorType ())
deleteOrBlockServerFile_ :: FileStoreClass s => FileRec -> (FileServerStats -> IORef Int) -> (s -> IO (Either XFTPErrorType ())) -> M s (Either XFTPErrorType ())
deleteOrBlockServerFile_ FileRec {filePath, fileInfo} stat storeAction = runExceptT $ do
path <- readTVarIO filePath
stats <- asks serverStats
@@ -636,7 +637,7 @@ deleteOrBlockServerFile_ FileRec {filePath, fileInfo} stat storeAction = runExce
getFileTime :: IO RoundedFileTime
getFileTime = getRoundedSystemTime
expireServerFiles :: Maybe Int -> ExpirationConfig -> M ()
expireServerFiles :: FileStoreClass s => Maybe Int -> ExpirationConfig -> M s ()
expireServerFiles itemDelay expCfg = do
st <- asks store
us <- asks usedStorage
@@ -662,21 +663,21 @@ expireServerFiles itemDelay expCfg = do
incFileStat filesExpired
unless (null expired) $ expireLoop st us old
randomId :: Int -> M ByteString
randomId :: Int -> M s ByteString
randomId n = atomically . C.randomBytes n =<< asks random
getFileId :: M XFTPFileId
getFileId :: M s XFTPFileId
getFileId = fmap EntityId . randomId =<< asks (fileIdSize . config)
withFileLog :: (StoreLog 'WriteMode -> IO a) -> M ()
withFileLog :: (StoreLog 'WriteMode -> IO a) -> M s ()
withFileLog action = liftIO . mapM_ action =<< asks storeLog
incFileStat :: (FileServerStats -> IORef Int) -> M ()
incFileStat :: (FileServerStats -> IORef Int) -> M s ()
incFileStat statSel = do
stats <- asks serverStats
liftIO $ atomicModifyIORef'_ (statSel stats) (+ 1)
saveServerStats :: M ()
saveServerStats :: M s ()
saveServerStats =
asks (serverStatsBackupFile . config)
>>= mapM_ (\f -> asks serverStats >>= liftIO . getFileServerStatsData >>= liftIO . saveStats f)
@@ -686,7 +687,7 @@ saveServerStats =
B.writeFile f $ strEncode stats
logNote "server stats saved"
restoreServerStats :: M ()
restoreServerStats :: FileStoreClass s => M s ()
restoreServerStats = asks (serverStatsBackupFile . config) >>= mapM_ restoreStats
where
restoreStats f = whenM (doesFileExist f) $ do

View File

@@ -9,6 +9,7 @@
module Simplex.FileTransfer.Server.Env
( XFTPServerConfig (..),
XFTPStoreConfig (..),
XFTPEnv (..),
XFTPRequest (..),
defaultInactiveClientExpiration,
@@ -87,9 +88,12 @@ defaultInactiveClientExpiration =
checkInterval = 3600 -- seconds, 1 hours
}
data XFTPEnv = XFTPEnv
data XFTPStoreConfig s where
XSCMemory :: Maybe FilePath -> XFTPStoreConfig STMFileStore
data XFTPEnv s = XFTPEnv
{ config :: XFTPServerConfig,
store :: STMFileStore,
store :: s,
usedStorage :: TVar Int64,
storeLog :: Maybe (StoreLog 'WriteMode),
random :: TVar ChaChaDRG,
@@ -109,11 +113,14 @@ defaultFileExpiration =
checkInterval = 2 * 3600 -- seconds, 2 hours
}
newXFTPServerEnv :: XFTPServerConfig -> IO XFTPEnv
newXFTPServerEnv config@XFTPServerConfig {storeLogFile, fileSizeQuota, xftpCredentials, httpCredentials} = do
newXFTPServerEnv :: FileStoreClass s => XFTPStoreConfig s -> XFTPServerConfig -> IO (XFTPEnv s)
newXFTPServerEnv storeCfg config@XFTPServerConfig {fileSizeQuota, xftpCredentials, httpCredentials} = do
random <- C.newRandom
store <- newFileStore ()
storeLog <- mapM (`readWriteFileStore` store) storeLogFile
(store, storeLog) <- case storeCfg of
XSCMemory storeLogPath -> do
st <- newFileStore ()
sl <- mapM (`readWriteFileStore` st) storeLogPath
pure (st, sl)
used <- getUsedStorage store
usedStorage <- newTVarIO used
forM_ fileSizeQuota $ \quota -> do

View File

@@ -28,7 +28,7 @@ import Options.Applicative
import Simplex.FileTransfer.Chunks
import Simplex.FileTransfer.Description (FileSize (..))
import Simplex.FileTransfer.Server (runXFTPServer)
import Simplex.FileTransfer.Server.Env (XFTPServerConfig (..), defFileExpirationHours, defaultFileExpiration, defaultInactiveClientExpiration)
import Simplex.FileTransfer.Server.Env (XFTPServerConfig (..), XFTPStoreConfig (..), defFileExpirationHours, defaultFileExpiration, defaultInactiveClientExpiration)
import Simplex.FileTransfer.Transport (alpnSupportedXFTPhandshakes, supportedFileServerVRange)
import qualified Simplex.Messaging.Crypto as C
import Simplex.Messaging.Encoding.String
@@ -194,7 +194,8 @@ xftpServerCLI_ generateSite serveStaticFiles cfgPath logPath = do
when (isJust webHttpPort || isJust webHttpsParams') $
serveStaticFiles EmbeddedWebParams {webStaticPath = path, webHttpPort, webHttpsParams = webHttpsParams'}
Nothing -> pure ()
runXFTPServer serverConfig
let storeCfg = XSCMemory $ storeLogFile serverConfig
runXFTPServer storeCfg serverConfig
where
isOnion = \case THOnionHost _ -> True; _ -> False
enableStoreLog = settingIsOn "STORE_LOG" "enable" ini

View File

@@ -14,7 +14,7 @@ import SMPClient (serverBracket)
import Simplex.FileTransfer.Client
import Simplex.FileTransfer.Description
import Simplex.FileTransfer.Server (runXFTPServerBlocking)
import Simplex.FileTransfer.Server.Env (XFTPServerConfig (..), defaultFileExpiration, defaultInactiveClientExpiration)
import Simplex.FileTransfer.Server.Env (XFTPServerConfig (..), XFTPStoreConfig (..), defaultFileExpiration, defaultInactiveClientExpiration)
import Simplex.FileTransfer.Transport (alpnSupportedXFTPhandshakes, supportedFileServerVRange)
import Simplex.Messaging.Protocol (XFTPServer)
import Simplex.Messaging.Transport.HTTP2 (httpALPN)
@@ -58,7 +58,7 @@ withXFTPServerCfgNoALPN cfg = withXFTPServerCfg cfg {transportConfig = (transpor
withXFTPServerCfg :: HasCallStack => XFTPServerConfig -> (HasCallStack => ThreadId -> IO a) -> IO a
withXFTPServerCfg cfg =
serverBracket
(\started -> runXFTPServerBlocking started cfg)
(\started -> runXFTPServerBlocking started (XSCMemory $ storeLogFile cfg) cfg)
(threadDelay 10000)
withXFTPServerThreadOn :: HasCallStack => (HasCallStack => ThreadId -> IO a) -> IO a