diff --git a/src/Simplex/FileTransfer/Server.hs b/src/Simplex/FileTransfer/Server.hs index 6bbf7b85f..dc10e7533 100644 --- a/src/Simplex/FileTransfer/Server.hs +++ b/src/Simplex/FileTransfer/Server.hs @@ -51,7 +51,6 @@ import Simplex.FileTransfer.Server.Env import Simplex.FileTransfer.Server.Prometheus import Simplex.FileTransfer.Server.Stats import Simplex.FileTransfer.Server.Store -import Simplex.FileTransfer.Server.Store.STM (STMFileStore) import Simplex.FileTransfer.Server.StoreLog import Simplex.FileTransfer.Transport import qualified Simplex.Messaging.Crypto as C @@ -88,7 +87,7 @@ import UnliftIO.Concurrent (threadDelay) import UnliftIO.Directory (canonicalizePath, doesFileExist, removeFile, renameFile) import qualified UnliftIO.Exception as E -type M a = ReaderT XFTPEnv IO a +type M s a = ReaderT (XFTPEnv s) IO a data XFTPTransportRequest = XFTPTransportRequest { thParams :: THandleParamsXFTP 'TServer, @@ -112,19 +111,19 @@ corsPreflightHeaders = ("Access-Control-Max-Age", "86400") ] -runXFTPServer :: XFTPServerConfig -> IO () -runXFTPServer cfg = do +runXFTPServer :: FileStoreClass s => XFTPStoreConfig s -> XFTPServerConfig -> IO () +runXFTPServer storeCfg cfg = do started <- newEmptyTMVarIO - runXFTPServerBlocking started cfg + runXFTPServerBlocking started storeCfg cfg -runXFTPServerBlocking :: TMVar Bool -> XFTPServerConfig -> IO () -runXFTPServerBlocking started cfg = newXFTPServerEnv cfg >>= runReaderT (xftpServer cfg started) +runXFTPServerBlocking :: FileStoreClass s => TMVar Bool -> XFTPStoreConfig s -> XFTPServerConfig -> IO () +runXFTPServerBlocking started storeCfg cfg = newXFTPServerEnv storeCfg cfg >>= runReaderT (xftpServer cfg started) data Handshake = HandshakeSent C.PrivateKeyX25519 | HandshakeAccepted (THandleParams XFTPVersion 'TServer) -xftpServer :: XFTPServerConfig -> TMVar Bool -> M () +xftpServer :: forall s. FileStoreClass s => XFTPServerConfig -> TMVar Bool -> M s () xftpServer cfg@XFTPServerConfig {xftpPort, transportConfig, inactiveClientExpiration, fileExpiration, xftpServerVRange} started = do mapM_ (expireServerFiles Nothing) fileExpiration restoreServerStats @@ -137,7 +136,7 @@ xftpServer cfg@XFTPServerConfig {xftpPort, transportConfig, inactiveClientExpira ) `finally` stopServer where - runServer :: M () + runServer :: M s () runServer = do srvCreds@(chain, pk) <- asks tlsServerCreds httpCreds_ <- asks httpServerCreds @@ -168,7 +167,7 @@ xftpServer cfg@XFTPServerConfig {xftpPort, transportConfig, inactiveClientExpira Nothing -> pure () Just thParams -> processRequest req0 {thParams} | otherwise -> liftIO . sendResponse $ H.responseNoBody N.ok200 (corsHeaders addCORS') - xftpServerHandshakeV1 :: X.CertificateChain -> C.APrivateSignKey -> TMap SessionId Handshake -> XFTPTransportRequest -> M (Maybe (THandleParams XFTPVersion 'TServer)) + xftpServerHandshakeV1 :: X.CertificateChain -> C.APrivateSignKey -> TMap SessionId Handshake -> XFTPTransportRequest -> M s (Maybe (THandleParams XFTPVersion 'TServer)) xftpServerHandshakeV1 chain serverSignKey sessions XFTPTransportRequest {thParams = thParams0@THandleParams {sessionId}, request, reqBody = HTTP2Body {bodyHead}, sendResponse, sniUsed, addCORS} = do s <- atomically $ TM.lookup sessionId sessions r <- runExceptT $ case s of @@ -227,39 +226,41 @@ xftpServer cfg@XFTPServerConfig {xftpPort, transportConfig, inactiveClientExpira liftIO . sendResponse $ H.responseNoBody N.ok200 (corsHeaders addCORS) pure Nothing Nothing -> throwE HANDSHAKE - sendError :: XFTPErrorType -> M (Maybe (THandleParams XFTPVersion 'TServer)) + sendError :: XFTPErrorType -> M s (Maybe (THandleParams XFTPVersion 'TServer)) sendError err = do runExceptT (encodeXftp err) >>= \case Right bs -> liftIO . sendResponse $ H.responseBuilder N.ok200 (corsHeaders addCORS) bs Left _ -> logError $ "Error encoding handshake error: " <> tshow err pure Nothing - encodeXftp :: Encoding a => a -> ExceptT XFTPErrorType (ReaderT XFTPEnv IO) Builder + encodeXftp :: Encoding a => a -> ExceptT XFTPErrorType (ReaderT (XFTPEnv s) IO) Builder encodeXftp a = byteString <$> liftHS (C.pad (smpEncode a) xftpBlockSize) liftHS = liftEitherWith (const HANDSHAKE) - stopServer :: M () + stopServer :: M s () stopServer = do withFileLog closeStoreLog + st <- asks store + liftIO $ closeFileStore st saveServerStats logNote "Server stopped" - expireFilesThread_ :: XFTPServerConfig -> [M ()] + expireFilesThread_ :: XFTPServerConfig -> [M s ()] expireFilesThread_ XFTPServerConfig {fileExpiration = Just fileExp} = [expireFiles fileExp] expireFilesThread_ _ = [] - expireFiles :: ExpirationConfig -> M () + expireFiles :: ExpirationConfig -> M s () expireFiles expCfg = do let interval = checkInterval expCfg * 1000000 forever $ do liftIO $ threadDelay' interval expireServerFiles (Just 100000) expCfg - serverStatsThread_ :: XFTPServerConfig -> [M ()] + serverStatsThread_ :: XFTPServerConfig -> [M s ()] serverStatsThread_ XFTPServerConfig {logStatsInterval = Just interval, logStatsStartTime, serverStatsLogFile} = [logServerStats logStatsStartTime interval serverStatsLogFile] serverStatsThread_ _ = [] - logServerStats :: Int64 -> Int64 -> FilePath -> M () + logServerStats :: Int64 -> Int64 -> FilePath -> M s () logServerStats startAt logInterval statsFilePath = do initialDelay <- (startAt -) . fromIntegral . (`div` 1000000_000000) . diffTimeToPicoseconds . utctDayTime <$> liftIO getCurrentTime liftIO $ putStrLn $ "server stats log enabled: " <> statsFilePath @@ -300,12 +301,12 @@ xftpServer cfg@XFTPServerConfig {xftpPort, transportConfig, inactiveClientExpira ] liftIO $ threadDelay' interval - prometheusMetricsThread_ :: XFTPServerConfig -> [M ()] + prometheusMetricsThread_ :: XFTPServerConfig -> [M s ()] prometheusMetricsThread_ XFTPServerConfig {prometheusInterval = Just interval, prometheusMetricsFile} = [savePrometheusMetrics interval prometheusMetricsFile] prometheusMetricsThread_ _ = [] - savePrometheusMetrics :: Int -> FilePath -> M () + savePrometheusMetrics :: Int -> FilePath -> M s () savePrometheusMetrics saveInterval metricsFile = do labelMyThread "savePrometheusMetrics" liftIO $ putStrLn $ "Prometheus metrics saved every " <> show saveInterval <> " seconds to " <> metricsFile @@ -324,11 +325,11 @@ xftpServer cfg@XFTPServerConfig {xftpPort, transportConfig, inactiveClientExpira let fd = periodStatDataCounts $ _filesDownloaded d pure FileServerMetrics {statsData = d, filesDownloadedPeriods = fd, rtsOptions} - controlPortThread_ :: XFTPServerConfig -> [M ()] + controlPortThread_ :: XFTPServerConfig -> [M s ()] controlPortThread_ XFTPServerConfig {controlPort = Just port} = [runCPServer port] controlPortThread_ _ = [] - runCPServer :: ServiceName -> M () + runCPServer :: ServiceName -> M s () runCPServer port = do cpStarted <- newEmptyTMVarIO u <- askUnliftIO @@ -336,7 +337,7 @@ xftpServer cfg@XFTPServerConfig {xftpPort, transportConfig, inactiveClientExpira labelMyThread "control port server" runLocalTCPServer cpStarted port $ runCPClient u where - runCPClient :: UnliftIO (ReaderT XFTPEnv IO) -> Socket -> IO () + runCPClient :: UnliftIO (ReaderT (XFTPEnv s) IO) -> Socket -> IO () runCPClient u sock = do labelMyThread "control port client" h <- socketToHandle sock ReadWriteMode @@ -395,7 +396,7 @@ data ServerFile = ServerFile sbState :: LC.SbState } -processRequest :: XFTPTransportRequest -> M () +processRequest :: FileStoreClass s => XFTPTransportRequest -> M s () processRequest XFTPTransportRequest {thParams, reqBody = body@HTTP2Body {bodyHead}, sendResponse, addCORS} | B.length bodyHead /= xftpBlockSize = sendXFTPResponse ("", NoEntity, FRErr BLOCK) Nothing | otherwise = @@ -430,7 +431,7 @@ processRequest XFTPTransportRequest {thParams, reqBody = body@HTTP2Body {bodyHea done #ifdef slow_servers -randomDelay :: M () +randomDelay :: M s () randomDelay = do d <- asks $ responseDelay . config when (d > 0) $ do @@ -440,14 +441,14 @@ randomDelay = do data VerificationResult = VRVerified XFTPRequest | VRFailed XFTPErrorType -verifyXFTPTransmission :: Maybe (THandleAuth 'TServer) -> SignedTransmission FileCmd -> M VerificationResult +verifyXFTPTransmission :: forall s. FileStoreClass s => Maybe (THandleAuth 'TServer) -> SignedTransmission FileCmd -> M s VerificationResult verifyXFTPTransmission thAuth (tAuth, authorized, (corrId, fId, cmd)) = case cmd of FileCmd SFSender (FNEW file rcps auth') -> pure $ XFTPReqNew file rcps auth' `verifyWith` sndKey file FileCmd SFRecipient PING -> pure $ VRVerified XFTPReqPing FileCmd party _ -> verifyCmd party where - verifyCmd :: SFileParty p -> M VerificationResult + verifyCmd :: SFileParty p -> M s VerificationResult verifyCmd party = do st <- asks store liftIO (getFile st party fId) >>= \case @@ -463,7 +464,7 @@ verifyXFTPTransmission thAuth (tAuth, authorized, (corrId, fId, cmd)) = -- TODO verify with DH authorization req `verifyWith` k = if verifyCmdAuthorization thAuth tAuth authorized corrId k then VRVerified req else VRFailed AUTH -processXFTPRequest :: HTTP2Body -> XFTPRequest -> M (FileResponse, Maybe ServerFile) +processXFTPRequest :: forall s. FileStoreClass s => HTTP2Body -> XFTPRequest -> M s (FileResponse, Maybe ServerFile) processXFTPRequest HTTP2Body {bodyPart} = \case XFTPReqNew file rks auth -> noFile =<< ifM allowNew (createFile file rks) (pure $ FRErr AUTH) where @@ -482,7 +483,7 @@ processXFTPRequest HTTP2Body {bodyPart} = \case XFTPReqPing -> noFile FRPong where noFile resp = pure (resp, Nothing) - createFile :: FileInfo -> NonEmpty RcvPublicAuthKey -> M FileResponse + createFile :: FileInfo -> NonEmpty RcvPublicAuthKey -> M s FileResponse createFile file rks = do st <- asks store r <- runExceptT $ do @@ -501,25 +502,25 @@ processXFTPRequest HTTP2Body {bodyPart} = \case let rIds = L.map (\(FileRecipient rId _) -> rId) rcps pure $ FRSndIds sId rIds pure $ either FRErr id r - addFileRetry :: STMFileStore -> FileInfo -> Int -> RoundedFileTime -> M (Either XFTPErrorType XFTPFileId) + addFileRetry :: s -> FileInfo -> Int -> RoundedFileTime -> M s (Either XFTPErrorType XFTPFileId) addFileRetry st file n ts = retryAdd n $ \sId -> runExceptT $ do ExceptT $ addFile st sId file ts EntityActive pure sId - addRecipientRetry :: STMFileStore -> Int -> XFTPFileId -> RcvPublicAuthKey -> M (Either XFTPErrorType FileRecipient) + addRecipientRetry :: s -> Int -> XFTPFileId -> RcvPublicAuthKey -> M s (Either XFTPErrorType FileRecipient) addRecipientRetry st n sId rpk = retryAdd n $ \rId -> runExceptT $ do let rcp = FileRecipient rId rpk ExceptT $ addRecipient st sId rcp pure rcp - retryAdd :: Int -> (XFTPFileId -> IO (Either XFTPErrorType a)) -> M (Either XFTPErrorType a) + retryAdd :: Int -> (XFTPFileId -> IO (Either XFTPErrorType a)) -> M s (Either XFTPErrorType a) retryAdd 0 _ = pure $ Left INTERNAL retryAdd n add = do fId <- getFileId liftIO (add fId) >>= \case Left DUPLICATE_ -> retryAdd (n - 1) add r -> pure r - addRecipients :: XFTPFileId -> NonEmpty RcvPublicAuthKey -> M FileResponse + addRecipients :: XFTPFileId -> NonEmpty RcvPublicAuthKey -> M s FileResponse addRecipients sId rks = do st <- asks store r <- runExceptT $ do @@ -530,7 +531,7 @@ processXFTPRequest HTTP2Body {bodyPart} = \case let rIds = L.map (\(FileRecipient rId _) -> rId) rcps pure $ FRRcvIds rIds pure $ either FRErr id r - receiveServerFile :: FileRec -> M FileResponse + receiveServerFile :: FileRec -> M s FileResponse receiveServerFile FileRec {senderId, fileInfo = FileInfo {size, digest}, filePath} = case bodyPart of Nothing -> pure $ FRErr SIZE -- TODO validate body size from request before downloading, once it's populated @@ -573,7 +574,7 @@ processXFTPRequest HTTP2Body {bodyPart} = \case receiveChunk spec = do t <- asks $ fileTimeout . config liftIO $ fromMaybe (Left TIMEOUT) <$> timeout t (runExceptT $ receiveFile getBody spec) - sendServerFile :: FileRec -> RcvPublicDhKey -> M (FileResponse, Maybe ServerFile) + sendServerFile :: FileRec -> RcvPublicDhKey -> M s (FileResponse, Maybe ServerFile) sendServerFile FileRec {senderId, filePath, fileInfo = FileInfo {size}} rDhKey = do readTVarIO filePath >>= \case Just path -> ifM (doesFileExist path) sendFile (pure (FRErr AUTH, Nothing)) @@ -592,13 +593,13 @@ processXFTPRequest HTTP2Body {bodyPart} = \case _ -> pure (FRErr INTERNAL, Nothing) _ -> pure (FRErr NO_FILE, Nothing) - deleteServerFile :: FileRec -> M FileResponse + deleteServerFile :: FileRec -> M s FileResponse deleteServerFile fr = either FRErr (\() -> FROk) <$> deleteServerFile_ fr logFileError :: SomeException -> IO () logFileError e = logError $ "Error deleting file: " <> tshow e - ackFileReception :: RecipientId -> FileRec -> M FileResponse + ackFileReception :: RecipientId -> FileRec -> M s FileResponse ackFileReception rId fr = do withFileLog (`logAckFile` rId) st <- asks store @@ -606,18 +607,18 @@ processXFTPRequest HTTP2Body {bodyPart} = \case incFileStat fileDownloadAcks pure FROk -deleteServerFile_ :: FileRec -> M (Either XFTPErrorType ()) +deleteServerFile_ :: FileStoreClass s => FileRec -> M s (Either XFTPErrorType ()) deleteServerFile_ fr@FileRec {senderId} = do withFileLog (`logDeleteFile` senderId) deleteOrBlockServerFile_ fr filesDeleted (`deleteFile` senderId) -- this also deletes the file from storage, but doesn't include it in delete statistics -blockServerFile :: FileRec -> BlockingInfo -> M (Either XFTPErrorType ()) +blockServerFile :: FileStoreClass s => FileRec -> BlockingInfo -> M s (Either XFTPErrorType ()) blockServerFile fr@FileRec {senderId} info = do withFileLog $ \sl -> logBlockFile sl senderId info deleteOrBlockServerFile_ fr filesBlocked $ \st -> blockFile st senderId info True -deleteOrBlockServerFile_ :: FileRec -> (FileServerStats -> IORef Int) -> (STMFileStore -> IO (Either XFTPErrorType ())) -> M (Either XFTPErrorType ()) +deleteOrBlockServerFile_ :: FileStoreClass s => FileRec -> (FileServerStats -> IORef Int) -> (s -> IO (Either XFTPErrorType ())) -> M s (Either XFTPErrorType ()) deleteOrBlockServerFile_ FileRec {filePath, fileInfo} stat storeAction = runExceptT $ do path <- readTVarIO filePath stats <- asks serverStats @@ -636,7 +637,7 @@ deleteOrBlockServerFile_ FileRec {filePath, fileInfo} stat storeAction = runExce getFileTime :: IO RoundedFileTime getFileTime = getRoundedSystemTime -expireServerFiles :: Maybe Int -> ExpirationConfig -> M () +expireServerFiles :: FileStoreClass s => Maybe Int -> ExpirationConfig -> M s () expireServerFiles itemDelay expCfg = do st <- asks store us <- asks usedStorage @@ -662,21 +663,21 @@ expireServerFiles itemDelay expCfg = do incFileStat filesExpired unless (null expired) $ expireLoop st us old -randomId :: Int -> M ByteString +randomId :: Int -> M s ByteString randomId n = atomically . C.randomBytes n =<< asks random -getFileId :: M XFTPFileId +getFileId :: M s XFTPFileId getFileId = fmap EntityId . randomId =<< asks (fileIdSize . config) -withFileLog :: (StoreLog 'WriteMode -> IO a) -> M () +withFileLog :: (StoreLog 'WriteMode -> IO a) -> M s () withFileLog action = liftIO . mapM_ action =<< asks storeLog -incFileStat :: (FileServerStats -> IORef Int) -> M () +incFileStat :: (FileServerStats -> IORef Int) -> M s () incFileStat statSel = do stats <- asks serverStats liftIO $ atomicModifyIORef'_ (statSel stats) (+ 1) -saveServerStats :: M () +saveServerStats :: M s () saveServerStats = asks (serverStatsBackupFile . config) >>= mapM_ (\f -> asks serverStats >>= liftIO . getFileServerStatsData >>= liftIO . saveStats f) @@ -686,7 +687,7 @@ saveServerStats = B.writeFile f $ strEncode stats logNote "server stats saved" -restoreServerStats :: M () +restoreServerStats :: FileStoreClass s => M s () restoreServerStats = asks (serverStatsBackupFile . config) >>= mapM_ restoreStats where restoreStats f = whenM (doesFileExist f) $ do diff --git a/src/Simplex/FileTransfer/Server/Env.hs b/src/Simplex/FileTransfer/Server/Env.hs index ce1ebecbf..f03dc2f12 100644 --- a/src/Simplex/FileTransfer/Server/Env.hs +++ b/src/Simplex/FileTransfer/Server/Env.hs @@ -9,6 +9,7 @@ module Simplex.FileTransfer.Server.Env ( XFTPServerConfig (..), + XFTPStoreConfig (..), XFTPEnv (..), XFTPRequest (..), defaultInactiveClientExpiration, @@ -87,9 +88,12 @@ defaultInactiveClientExpiration = checkInterval = 3600 -- seconds, 1 hours } -data XFTPEnv = XFTPEnv +data XFTPStoreConfig s where + XSCMemory :: Maybe FilePath -> XFTPStoreConfig STMFileStore + +data XFTPEnv s = XFTPEnv { config :: XFTPServerConfig, - store :: STMFileStore, + store :: s, usedStorage :: TVar Int64, storeLog :: Maybe (StoreLog 'WriteMode), random :: TVar ChaChaDRG, @@ -109,11 +113,14 @@ defaultFileExpiration = checkInterval = 2 * 3600 -- seconds, 2 hours } -newXFTPServerEnv :: XFTPServerConfig -> IO XFTPEnv -newXFTPServerEnv config@XFTPServerConfig {storeLogFile, fileSizeQuota, xftpCredentials, httpCredentials} = do +newXFTPServerEnv :: FileStoreClass s => XFTPStoreConfig s -> XFTPServerConfig -> IO (XFTPEnv s) +newXFTPServerEnv storeCfg config@XFTPServerConfig {fileSizeQuota, xftpCredentials, httpCredentials} = do random <- C.newRandom - store <- newFileStore () - storeLog <- mapM (`readWriteFileStore` store) storeLogFile + (store, storeLog) <- case storeCfg of + XSCMemory storeLogPath -> do + st <- newFileStore () + sl <- mapM (`readWriteFileStore` st) storeLogPath + pure (st, sl) used <- getUsedStorage store usedStorage <- newTVarIO used forM_ fileSizeQuota $ \quota -> do diff --git a/src/Simplex/FileTransfer/Server/Main.hs b/src/Simplex/FileTransfer/Server/Main.hs index 101fe945b..42c53d32c 100644 --- a/src/Simplex/FileTransfer/Server/Main.hs +++ b/src/Simplex/FileTransfer/Server/Main.hs @@ -28,7 +28,7 @@ import Options.Applicative import Simplex.FileTransfer.Chunks import Simplex.FileTransfer.Description (FileSize (..)) import Simplex.FileTransfer.Server (runXFTPServer) -import Simplex.FileTransfer.Server.Env (XFTPServerConfig (..), defFileExpirationHours, defaultFileExpiration, defaultInactiveClientExpiration) +import Simplex.FileTransfer.Server.Env (XFTPServerConfig (..), XFTPStoreConfig (..), defFileExpirationHours, defaultFileExpiration, defaultInactiveClientExpiration) import Simplex.FileTransfer.Transport (alpnSupportedXFTPhandshakes, supportedFileServerVRange) import qualified Simplex.Messaging.Crypto as C import Simplex.Messaging.Encoding.String @@ -194,7 +194,8 @@ xftpServerCLI_ generateSite serveStaticFiles cfgPath logPath = do when (isJust webHttpPort || isJust webHttpsParams') $ serveStaticFiles EmbeddedWebParams {webStaticPath = path, webHttpPort, webHttpsParams = webHttpsParams'} Nothing -> pure () - runXFTPServer serverConfig + let storeCfg = XSCMemory $ storeLogFile serverConfig + runXFTPServer storeCfg serverConfig where isOnion = \case THOnionHost _ -> True; _ -> False enableStoreLog = settingIsOn "STORE_LOG" "enable" ini diff --git a/tests/XFTPClient.hs b/tests/XFTPClient.hs index 85a1d21b8..6fcc32669 100644 --- a/tests/XFTPClient.hs +++ b/tests/XFTPClient.hs @@ -14,7 +14,7 @@ import SMPClient (serverBracket) import Simplex.FileTransfer.Client import Simplex.FileTransfer.Description import Simplex.FileTransfer.Server (runXFTPServerBlocking) -import Simplex.FileTransfer.Server.Env (XFTPServerConfig (..), defaultFileExpiration, defaultInactiveClientExpiration) +import Simplex.FileTransfer.Server.Env (XFTPServerConfig (..), XFTPStoreConfig (..), defaultFileExpiration, defaultInactiveClientExpiration) import Simplex.FileTransfer.Transport (alpnSupportedXFTPhandshakes, supportedFileServerVRange) import Simplex.Messaging.Protocol (XFTPServer) import Simplex.Messaging.Transport.HTTP2 (httpALPN) @@ -58,7 +58,7 @@ withXFTPServerCfgNoALPN cfg = withXFTPServerCfg cfg {transportConfig = (transpor withXFTPServerCfg :: HasCallStack => XFTPServerConfig -> (HasCallStack => ThreadId -> IO a) -> IO a withXFTPServerCfg cfg = serverBracket - (\started -> runXFTPServerBlocking started cfg) + (\started -> runXFTPServerBlocking started (XSCMemory $ storeLogFile cfg) cfg) (threadDelay 10000) withXFTPServerThreadOn :: HasCallStack => (HasCallStack => ThreadId -> IO a) -> IO a