diff --git a/src/Simplex/FileTransfer/Client.hs b/src/Simplex/FileTransfer/Client.hs index 1b3727d22..9489f52c1 100644 --- a/src/Simplex/FileTransfer/Client.hs +++ b/src/Simplex/FileTransfer/Client.hs @@ -132,11 +132,15 @@ xftpClientError = \case HCIOError e -> PCEIOError e sendXFTPCommand :: forall p. FilePartyI p => XFTPClient -> C.APrivateSignKey -> XFTPFileId -> FileCommand p -> Maybe XFTPChunkSpec -> ExceptT XFTPClientError IO (FileResponse, HTTP2Body) -sendXFTPCommand XFTPClient {config, http2Client = http2@HTTP2Client {sessionId}} pKey fId cmd chunkSpec_ = do +sendXFTPCommand c@XFTPClient {http2Client = HTTP2Client {sessionId}} pKey fId cmd chunkSpec_ = do t <- liftEither . first PCETransportError $ xftpEncodeTransmission sessionId (Just pKey) ("", fId, FileCmd (sFileParty @p) cmd) - let req = H.requestStreaming N.methodPost "/" [] $ streamBody t + sendXFTPTransmission c t chunkSpec_ + +sendXFTPTransmission :: XFTPClient -> ByteString -> Maybe XFTPChunkSpec -> ExceptT XFTPClientError IO (FileResponse, HTTP2Body) +sendXFTPTransmission XFTPClient {config, http2Client = http2@HTTP2Client {sessionId}} t chunkSpec_ = do + let req = H.requestStreaming N.methodPost "/" [] streamBody reqTimeout = (\XFTPChunkSpec {chunkSize} -> chunkTimeout config chunkSize) <$> chunkSpec_ HTTP2Response {respBody = body@HTTP2Body {bodyHead}} <- liftEitherError xftpClientError $ sendRequest http2 req reqTimeout when (B.length bodyHead /= xftpBlockSize) $ throwError $ PCEResponseError BLOCK @@ -148,8 +152,8 @@ sendXFTPCommand XFTPClient {config, http2Client = http2@HTTP2Client {sessionId}} _ -> pure (r, body) Left e -> throwError $ PCEResponseError e where - streamBody :: ByteString -> (Builder -> IO ()) -> IO () -> IO () - streamBody t send done = do + streamBody :: (Builder -> IO ()) -> IO () -> IO () + streamBody send done = do send $ byteString t forM_ chunkSpec_ $ \XFTPChunkSpec {filePath, chunkOffset, chunkSize} -> withFile filePath ReadMode $ \h -> do @@ -207,6 +211,16 @@ deleteXFTPChunk c spKey sId = sendXFTPCommand c spKey sId FDEL Nothing >>= okRes ackXFTPChunk :: XFTPClient -> C.APrivateSignKey -> RecipientId -> ExceptT XFTPClientError IO () ackXFTPChunk c rpKey rId = sendXFTPCommand c rpKey rId FACK Nothing >>= okResponse +pingXFTP :: XFTPClient -> ExceptT XFTPClientError IO () +pingXFTP c@XFTPClient {http2Client = HTTP2Client {sessionId}} = do + t <- + liftEither . first PCETransportError $ + xftpEncodeTransmission sessionId Nothing ("", "", FileCmd SFRecipient PING) + (r, _) <- sendXFTPTransmission c t Nothing + case r of + FRPong -> pure () + _ -> throwError $ PCEUnexpectedResponse $ bshow r + okResponse :: (FileResponse, HTTP2Body) -> ExceptT XFTPClientError IO () okResponse = \case (FROk, body) -> noFile body () diff --git a/src/Simplex/FileTransfer/Server.hs b/src/Simplex/FileTransfer/Server.hs index 87c06226b..2d69c96fb 100644 --- a/src/Simplex/FileTransfer/Server.hs +++ b/src/Simplex/FileTransfer/Server.hs @@ -70,7 +70,7 @@ runXFTPServerBlocking :: TMVar Bool -> XFTPServerConfig -> IO () runXFTPServerBlocking started cfg = newXFTPServerEnv cfg >>= runReaderT (xftpServer cfg started) xftpServer :: XFTPServerConfig -> TMVar Bool -> M () -xftpServer cfg@XFTPServerConfig {xftpPort, transportConfig} started = do +xftpServer cfg@XFTPServerConfig {xftpPort, transportConfig, inactiveClientExpiration} started = do restoreServerStats raceAny_ (runServer : expireFilesThread_ cfg <> serverStatsThread_ cfg) `finally` stopServer where @@ -79,7 +79,7 @@ xftpServer cfg@XFTPServerConfig {xftpPort, transportConfig} started = do serverParams <- asks tlsServerParams env <- ask liftIO $ - runHTTP2Server started xftpPort defaultHTTP2BufferSize serverParams transportConfig $ \sessionId r sendResponse -> do + runHTTP2Server started xftpPort defaultHTTP2BufferSize serverParams transportConfig inactiveClientExpiration $ \sessionId r sendResponse -> do reqBody <- getHTTP2Body r xftpBlockSize processRequest HTTP2Request {sessionId, request = r, reqBody, sendResponse} `runReaderT` env diff --git a/src/Simplex/FileTransfer/Server/Env.hs b/src/Simplex/FileTransfer/Server/Env.hs index d9b20be19..bb49e3192 100644 --- a/src/Simplex/FileTransfer/Server/Env.hs +++ b/src/Simplex/FileTransfer/Server/Env.hs @@ -46,6 +46,8 @@ data XFTPServerConfig = XFTPServerConfig newFileBasicAuth :: Maybe BasicAuth, -- | time after which the files can be removed and check interval, seconds fileExpiration :: Maybe ExpirationConfig, + -- | time after which inactive clients can be disconnected and check interval, seconds + inactiveClientExpiration :: Maybe ExpirationConfig, -- CA certificate private key is not needed for initialization caCertificateFile :: FilePath, privateKeyFile :: FilePath, @@ -58,6 +60,13 @@ data XFTPServerConfig = XFTPServerConfig transportConfig :: TransportServerConfig } +defaultInactiveClientExpiration :: ExpirationConfig +defaultInactiveClientExpiration = + ExpirationConfig + { ttl = 43200, -- seconds, 12 hours + checkInterval = 3600 -- seconds, 1 hours + } + data XFTPEnv = XFTPEnv { config :: XFTPServerConfig, store :: FileStore, diff --git a/src/Simplex/FileTransfer/Server/Main.hs b/src/Simplex/FileTransfer/Server/Main.hs index cfa318c31..0dee944fb 100644 --- a/src/Simplex/FileTransfer/Server/Main.hs +++ b/src/Simplex/FileTransfer/Server/Main.hs @@ -19,7 +19,7 @@ import Options.Applicative import Simplex.FileTransfer.Chunks import Simplex.FileTransfer.Description (FileSize (..)) import Simplex.FileTransfer.Server (runXFTPServer) -import Simplex.FileTransfer.Server.Env (XFTPServerConfig (..), defFileExpirationHours, defaultFileExpiration) +import Simplex.FileTransfer.Server.Env (XFTPServerConfig (..), defFileExpirationHours, defaultFileExpiration, defaultInactiveClientExpiration) import qualified Simplex.Messaging.Crypto as C import Simplex.Messaging.Encoding.String import Simplex.Messaging.Protocol (ProtoServerWithAuth (..), pattern XFTPServer) @@ -104,6 +104,12 @@ xftpServerCLI cfgPath logPath = do \[FILES]\n" <> ("path: " <> filesPath <> "\n") <> ("storage_quota: " <> B.unpack (strEncode fileSizeQuota) <> "\n") + <> "\n\ + \[INACTIVE_CLIENTS]\n\ + \# TTL and interval to check inactive clients\n\ + \disconnect: off\n" + <> ("# ttl: " <> show (ttl defaultInactiveClientExpiration) <> "\n") + <> ("# check_interval: " <> show (checkInterval defaultInactiveClientExpiration) <> "\n") runServer ini = do hSetBuffering stdout LineBuffering hSetBuffering stderr LineBuffering @@ -118,13 +124,16 @@ xftpServerCLI cfgPath logPath = do enableStoreLog = settingIsOn "STORE_LOG" "enable" ini logStats = settingIsOn "STORE_LOG" "log_stats" ini c = combine cfgPath . ($ defaultX509Config) - printXFTPConfig XFTPServerConfig {allowNewFiles, newFileBasicAuth, xftpPort, storeLogFile, fileExpiration} = do + printXFTPConfig XFTPServerConfig {allowNewFiles, newFileBasicAuth, xftpPort, storeLogFile, fileExpiration, inactiveClientExpiration} = do putStrLn $ case storeLogFile of Just f -> "Store log: " <> f _ -> "Store log disabled." putStrLn $ case fileExpiration of Just ExpirationConfig {ttl} -> "expiring files after " <> showTTL ttl _ -> "not expiring files" + putStrLn $ case inactiveClientExpiration of + Just ExpirationConfig {ttl, checkInterval} -> "expiring clients inactive for " <> show ttl <> " seconds every " <> show checkInterval <> " seconds" + _ -> "not expiring inactive clients" putStrLn $ "Uploading new files " <> if allowNewFiles @@ -147,6 +156,12 @@ xftpServerCLI cfgPath logPath = do defaultFileExpiration { ttl = 3600 * readIniDefault defFileExpirationHours "STORE_LOG" "expire_files_hours" ini }, + inactiveClientExpiration = + settingIsOn "INACTIVE_CLIENTS" "disconnect" ini + $> ExpirationConfig + { ttl = readStrictIni "INACTIVE_CLIENTS" "ttl" ini, + checkInterval = readStrictIni "INACTIVE_CLIENTS" "check_interval" ini + }, caCertificateFile = c caCrtFile, privateKeyFile = c serverKeyFile, certificateFile = c serverCrtFile, diff --git a/src/Simplex/Messaging/Transport/HTTP2/Server.hs b/src/Simplex/Messaging/Transport/HTTP2/Server.hs index 139205235..e6dda40a1 100644 --- a/src/Simplex/Messaging/Transport/HTTP2/Server.hs +++ b/src/Simplex/Messaging/Transport/HTTP2/Server.hs @@ -5,15 +5,20 @@ module Simplex.Messaging.Transport.HTTP2.Server where import Control.Concurrent.Async (Async, async, uninterruptibleCancel) import Control.Concurrent.STM import Control.Monad +import Data.Time.Clock.System (getSystemTime, systemSeconds) import Network.HPACK (BufferSize) import Network.HTTP2.Server (Request, Response) import qualified Network.HTTP2.Server as H import Network.Socket import qualified Network.TLS as T import Numeric.Natural (Natural) -import Simplex.Messaging.Transport (SessionId, TLS) +import Simplex.Messaging.Server.Expiration +import Simplex.Messaging.Transport (SessionId, TLS, closeConnection) import Simplex.Messaging.Transport.HTTP2 import Simplex.Messaging.Transport.Server (TransportServerConfig (..), loadSupportedTLSServerParams, runTransportServer) +import Simplex.Messaging.Util (threadDelay') +import UnliftIO (finally) +import UnliftIO.Concurrent (forkIO, killThread) type HTTP2ServerFunc = SessionId -> Request -> (Response -> IO ()) -> IO () @@ -49,7 +54,7 @@ getHTTP2Server HTTP2ServerConfig {qSize, http2Port, bufferSize, bodyHeadSize, se started <- newEmptyTMVarIO reqQ <- newTBQueueIO qSize action <- async $ - runHTTP2Server started http2Port bufferSize tlsServerParams transportConfig $ \sessionId r sendResponse -> do + runHTTP2Server started http2Port bufferSize tlsServerParams transportConfig Nothing $ \sessionId r sendResponse -> do reqBody <- getHTTP2Body r bodyHeadSize atomically $ writeTBQueue reqQ HTTP2Request {sessionId, request = r, reqBody, sendResponse} void . atomically $ takeTMVar started @@ -58,12 +63,29 @@ getHTTP2Server HTTP2ServerConfig {qSize, http2Port, bufferSize, bodyHeadSize, se closeHTTP2Server :: HTTP2Server -> IO () closeHTTP2Server = uninterruptibleCancel . action -runHTTP2Server :: TMVar Bool -> ServiceName -> BufferSize -> T.ServerParams -> TransportServerConfig -> HTTP2ServerFunc -> IO () -runHTTP2Server started port bufferSize serverParams transportConfig = runHTTP2ServerWith bufferSize setup +runHTTP2Server :: TMVar Bool -> ServiceName -> BufferSize -> T.ServerParams -> TransportServerConfig -> Maybe ExpirationConfig -> HTTP2ServerFunc -> IO () +runHTTP2Server started port bufferSize serverParams transportConfig expCfg_ = runHTTP2ServerWith_ expCfg_ bufferSize setup where setup = runTransportServer started port serverParams transportConfig -runHTTP2ServerWith :: BufferSize -> ((TLS -> IO ()) -> a) -> (SessionId -> Request -> (Response -> IO ()) -> IO ()) -> a -runHTTP2ServerWith bufferSize setup http2Server = setup $ withHTTP2 bufferSize run +runHTTP2ServerWith :: BufferSize -> ((TLS -> IO ()) -> a) -> HTTP2ServerFunc -> a +runHTTP2ServerWith = runHTTP2ServerWith_ Nothing + +runHTTP2ServerWith_ :: Maybe ExpirationConfig -> BufferSize -> ((TLS -> IO ()) -> a) -> HTTP2ServerFunc -> a +runHTTP2ServerWith_ expCfg_ bufferSize setup http2Server = setup $ \tls -> do + activeAt <- newTVarIO =<< getSystemTime + tid_ <- mapM (forkIO . expireInactiveClient tls activeAt) expCfg_ + withHTTP2 bufferSize (run activeAt) tls `finally` mapM_ killThread tid_ where - run cfg sessId = H.run cfg $ \req _aux sendResp -> http2Server sessId req (`sendResp` []) + run activeAt cfg sessId = H.run cfg $ \req _aux sendResp -> do + getSystemTime >>= atomically . writeTVar activeAt + http2Server sessId req (`sendResp` []) + expireInactiveClient tls activeAt expCfg = loop + where + loop = do + threadDelay' $ checkInterval expCfg * 1000000 + old <- expireBeforeEpoch expCfg + ts <- readTVarIO activeAt + if systemSeconds ts < old + then closeConnection tls + else loop diff --git a/tests/XFTPClient.hs b/tests/XFTPClient.hs index 50d75377a..295aad059 100644 --- a/tests/XFTPClient.hs +++ b/tests/XFTPClient.hs @@ -12,7 +12,7 @@ import SMPClient (serverBracket) import Simplex.FileTransfer.Client import Simplex.FileTransfer.Description import Simplex.FileTransfer.Server (runXFTPServerBlocking) -import Simplex.FileTransfer.Server.Env (XFTPServerConfig (..), defaultFileExpiration) +import Simplex.FileTransfer.Server.Env (XFTPServerConfig (..), defaultFileExpiration, defaultInactiveClientExpiration) import Simplex.Messaging.Protocol (XFTPServer) import Simplex.Messaging.Transport.Server import Test.Hspec @@ -105,6 +105,7 @@ testXFTPServerConfig = allowNewFiles = True, newFileBasicAuth = Nothing, fileExpiration = Just defaultFileExpiration, + inactiveClientExpiration = Just defaultInactiveClientExpiration, caCertificateFile = "tests/fixtures/ca.crt", privateKeyFile = "tests/fixtures/server.key", certificateFile = "tests/fixtures/server.crt", diff --git a/tests/XFTPServerTests.hs b/tests/XFTPServerTests.hs index cb939e07e..e2cac1b13 100644 --- a/tests/XFTPServerTests.hs +++ b/tests/XFTPServerTests.hs @@ -28,6 +28,7 @@ import qualified Simplex.Messaging.Crypto as C import qualified Simplex.Messaging.Crypto.Lazy as LC import Simplex.Messaging.Protocol (BasicAuth, SenderId) import Simplex.Messaging.Server.Expiration (ExpirationConfig (..)) +import Simplex.Messaging.Util (liftIOEither) import System.Directory (createDirectoryIfMissing, removeDirectoryRecursive, removeFile) import System.FilePath (()) import Test.Hspec @@ -49,6 +50,7 @@ xftpServerTests = it "should acknowledge file chunk reception (2 clients)" testFileChunkAck2 it "should not allow chunks of wrong size" testWrongChunkSize it "should expire chunks after set interval" testFileChunkExpiration + it "should disconnect inactive clients" testInactiveClientExpiration it "should not allow uploading chunks after specified storage quota" testFileStorageQuota it "should store file records to log and restore them after server restart" testFileLog describe "XFTP basic auth" $ do @@ -214,6 +216,21 @@ testFileChunkExpiration = withXFTPServerCfg testXFTPServerConfig {fileExpiration where fileExpiration = Just ExpirationConfig {ttl = 1, checkInterval = 1} +testInactiveClientExpiration :: Expectation +testInactiveClientExpiration = withXFTPServerCfg testXFTPServerConfig {inactiveClientExpiration} $ \_ -> runRight_ $ do + disconnected <- newEmptyTMVarIO + c <- liftIOEither $ getXFTPClient (1, testXFTPServer, Nothing) testXFTPClientConfig (\_ -> atomically $ putTMVar disconnected ()) + pingXFTP c + liftIO $ do + threadDelay 100000 + atomically (tryReadTMVar disconnected) `shouldReturn` Nothing + pingXFTP c + liftIO $ do + threadDelay 3000000 + atomically (tryTakeTMVar disconnected) `shouldReturn` Just () + where + inactiveClientExpiration = Just ExpirationConfig {ttl = 1, checkInterval = 1} + testFileStorageQuota :: Expectation testFileStorageQuota = withXFTPServerCfg testXFTPServerConfig {fileSizeQuota = Just $ chSize * 2} $ \_ -> testXFTPClient $ \c -> runRight_ $ do