xftp-server: add inactiveClientExpiration (#936)

* xftp-server: add inactiveClientExpiration

* fix test config

* add test

* add xftpPing

* switch to PCEUnexpectedResponse

* remove watchdog when server quits

* rename, loop

---------

Co-authored-by: Evgeny Poberezkin <2769109+epoberezkin@users.noreply.github.com>
This commit is contained in:
Alexander Bondarenko
2023-12-27 22:31:19 +02:00
committed by GitHub
parent bb4de2e63c
commit 7eb7bd5e81
7 changed files with 94 additions and 16 deletions

View File

@@ -132,11 +132,15 @@ xftpClientError = \case
HCIOError e -> PCEIOError e
sendXFTPCommand :: forall p. FilePartyI p => XFTPClient -> C.APrivateSignKey -> XFTPFileId -> FileCommand p -> Maybe XFTPChunkSpec -> ExceptT XFTPClientError IO (FileResponse, HTTP2Body)
sendXFTPCommand XFTPClient {config, http2Client = http2@HTTP2Client {sessionId}} pKey fId cmd chunkSpec_ = do
sendXFTPCommand c@XFTPClient {http2Client = HTTP2Client {sessionId}} pKey fId cmd chunkSpec_ = do
t <-
liftEither . first PCETransportError $
xftpEncodeTransmission sessionId (Just pKey) ("", fId, FileCmd (sFileParty @p) cmd)
let req = H.requestStreaming N.methodPost "/" [] $ streamBody t
sendXFTPTransmission c t chunkSpec_
sendXFTPTransmission :: XFTPClient -> ByteString -> Maybe XFTPChunkSpec -> ExceptT XFTPClientError IO (FileResponse, HTTP2Body)
sendXFTPTransmission XFTPClient {config, http2Client = http2@HTTP2Client {sessionId}} t chunkSpec_ = do
let req = H.requestStreaming N.methodPost "/" [] streamBody
reqTimeout = (\XFTPChunkSpec {chunkSize} -> chunkTimeout config chunkSize) <$> chunkSpec_
HTTP2Response {respBody = body@HTTP2Body {bodyHead}} <- liftEitherError xftpClientError $ sendRequest http2 req reqTimeout
when (B.length bodyHead /= xftpBlockSize) $ throwError $ PCEResponseError BLOCK
@@ -148,8 +152,8 @@ sendXFTPCommand XFTPClient {config, http2Client = http2@HTTP2Client {sessionId}}
_ -> pure (r, body)
Left e -> throwError $ PCEResponseError e
where
streamBody :: ByteString -> (Builder -> IO ()) -> IO () -> IO ()
streamBody t send done = do
streamBody :: (Builder -> IO ()) -> IO () -> IO ()
streamBody send done = do
send $ byteString t
forM_ chunkSpec_ $ \XFTPChunkSpec {filePath, chunkOffset, chunkSize} ->
withFile filePath ReadMode $ \h -> do
@@ -207,6 +211,16 @@ deleteXFTPChunk c spKey sId = sendXFTPCommand c spKey sId FDEL Nothing >>= okRes
ackXFTPChunk :: XFTPClient -> C.APrivateSignKey -> RecipientId -> ExceptT XFTPClientError IO ()
ackXFTPChunk c rpKey rId = sendXFTPCommand c rpKey rId FACK Nothing >>= okResponse
pingXFTP :: XFTPClient -> ExceptT XFTPClientError IO ()
pingXFTP c@XFTPClient {http2Client = HTTP2Client {sessionId}} = do
t <-
liftEither . first PCETransportError $
xftpEncodeTransmission sessionId Nothing ("", "", FileCmd SFRecipient PING)
(r, _) <- sendXFTPTransmission c t Nothing
case r of
FRPong -> pure ()
_ -> throwError $ PCEUnexpectedResponse $ bshow r
okResponse :: (FileResponse, HTTP2Body) -> ExceptT XFTPClientError IO ()
okResponse = \case
(FROk, body) -> noFile body ()

View File

@@ -70,7 +70,7 @@ runXFTPServerBlocking :: TMVar Bool -> XFTPServerConfig -> IO ()
runXFTPServerBlocking started cfg = newXFTPServerEnv cfg >>= runReaderT (xftpServer cfg started)
xftpServer :: XFTPServerConfig -> TMVar Bool -> M ()
xftpServer cfg@XFTPServerConfig {xftpPort, transportConfig} started = do
xftpServer cfg@XFTPServerConfig {xftpPort, transportConfig, inactiveClientExpiration} started = do
restoreServerStats
raceAny_ (runServer : expireFilesThread_ cfg <> serverStatsThread_ cfg) `finally` stopServer
where
@@ -79,7 +79,7 @@ xftpServer cfg@XFTPServerConfig {xftpPort, transportConfig} started = do
serverParams <- asks tlsServerParams
env <- ask
liftIO $
runHTTP2Server started xftpPort defaultHTTP2BufferSize serverParams transportConfig $ \sessionId r sendResponse -> do
runHTTP2Server started xftpPort defaultHTTP2BufferSize serverParams transportConfig inactiveClientExpiration $ \sessionId r sendResponse -> do
reqBody <- getHTTP2Body r xftpBlockSize
processRequest HTTP2Request {sessionId, request = r, reqBody, sendResponse} `runReaderT` env

View File

@@ -46,6 +46,8 @@ data XFTPServerConfig = XFTPServerConfig
newFileBasicAuth :: Maybe BasicAuth,
-- | time after which the files can be removed and check interval, seconds
fileExpiration :: Maybe ExpirationConfig,
-- | time after which inactive clients can be disconnected and check interval, seconds
inactiveClientExpiration :: Maybe ExpirationConfig,
-- CA certificate private key is not needed for initialization
caCertificateFile :: FilePath,
privateKeyFile :: FilePath,
@@ -58,6 +60,13 @@ data XFTPServerConfig = XFTPServerConfig
transportConfig :: TransportServerConfig
}
defaultInactiveClientExpiration :: ExpirationConfig
defaultInactiveClientExpiration =
ExpirationConfig
{ ttl = 43200, -- seconds, 12 hours
checkInterval = 3600 -- seconds, 1 hours
}
data XFTPEnv = XFTPEnv
{ config :: XFTPServerConfig,
store :: FileStore,

View File

@@ -19,7 +19,7 @@ import Options.Applicative
import Simplex.FileTransfer.Chunks
import Simplex.FileTransfer.Description (FileSize (..))
import Simplex.FileTransfer.Server (runXFTPServer)
import Simplex.FileTransfer.Server.Env (XFTPServerConfig (..), defFileExpirationHours, defaultFileExpiration)
import Simplex.FileTransfer.Server.Env (XFTPServerConfig (..), defFileExpirationHours, defaultFileExpiration, defaultInactiveClientExpiration)
import qualified Simplex.Messaging.Crypto as C
import Simplex.Messaging.Encoding.String
import Simplex.Messaging.Protocol (ProtoServerWithAuth (..), pattern XFTPServer)
@@ -104,6 +104,12 @@ xftpServerCLI cfgPath logPath = do
\[FILES]\n"
<> ("path: " <> filesPath <> "\n")
<> ("storage_quota: " <> B.unpack (strEncode fileSizeQuota) <> "\n")
<> "\n\
\[INACTIVE_CLIENTS]\n\
\# TTL and interval to check inactive clients\n\
\disconnect: off\n"
<> ("# ttl: " <> show (ttl defaultInactiveClientExpiration) <> "\n")
<> ("# check_interval: " <> show (checkInterval defaultInactiveClientExpiration) <> "\n")
runServer ini = do
hSetBuffering stdout LineBuffering
hSetBuffering stderr LineBuffering
@@ -118,13 +124,16 @@ xftpServerCLI cfgPath logPath = do
enableStoreLog = settingIsOn "STORE_LOG" "enable" ini
logStats = settingIsOn "STORE_LOG" "log_stats" ini
c = combine cfgPath . ($ defaultX509Config)
printXFTPConfig XFTPServerConfig {allowNewFiles, newFileBasicAuth, xftpPort, storeLogFile, fileExpiration} = do
printXFTPConfig XFTPServerConfig {allowNewFiles, newFileBasicAuth, xftpPort, storeLogFile, fileExpiration, inactiveClientExpiration} = do
putStrLn $ case storeLogFile of
Just f -> "Store log: " <> f
_ -> "Store log disabled."
putStrLn $ case fileExpiration of
Just ExpirationConfig {ttl} -> "expiring files after " <> showTTL ttl
_ -> "not expiring files"
putStrLn $ case inactiveClientExpiration of
Just ExpirationConfig {ttl, checkInterval} -> "expiring clients inactive for " <> show ttl <> " seconds every " <> show checkInterval <> " seconds"
_ -> "not expiring inactive clients"
putStrLn $
"Uploading new files "
<> if allowNewFiles
@@ -147,6 +156,12 @@ xftpServerCLI cfgPath logPath = do
defaultFileExpiration
{ ttl = 3600 * readIniDefault defFileExpirationHours "STORE_LOG" "expire_files_hours" ini
},
inactiveClientExpiration =
settingIsOn "INACTIVE_CLIENTS" "disconnect" ini
$> ExpirationConfig
{ ttl = readStrictIni "INACTIVE_CLIENTS" "ttl" ini,
checkInterval = readStrictIni "INACTIVE_CLIENTS" "check_interval" ini
},
caCertificateFile = c caCrtFile,
privateKeyFile = c serverKeyFile,
certificateFile = c serverCrtFile,

View File

@@ -5,15 +5,20 @@ module Simplex.Messaging.Transport.HTTP2.Server where
import Control.Concurrent.Async (Async, async, uninterruptibleCancel)
import Control.Concurrent.STM
import Control.Monad
import Data.Time.Clock.System (getSystemTime, systemSeconds)
import Network.HPACK (BufferSize)
import Network.HTTP2.Server (Request, Response)
import qualified Network.HTTP2.Server as H
import Network.Socket
import qualified Network.TLS as T
import Numeric.Natural (Natural)
import Simplex.Messaging.Transport (SessionId, TLS)
import Simplex.Messaging.Server.Expiration
import Simplex.Messaging.Transport (SessionId, TLS, closeConnection)
import Simplex.Messaging.Transport.HTTP2
import Simplex.Messaging.Transport.Server (TransportServerConfig (..), loadSupportedTLSServerParams, runTransportServer)
import Simplex.Messaging.Util (threadDelay')
import UnliftIO (finally)
import UnliftIO.Concurrent (forkIO, killThread)
type HTTP2ServerFunc = SessionId -> Request -> (Response -> IO ()) -> IO ()
@@ -49,7 +54,7 @@ getHTTP2Server HTTP2ServerConfig {qSize, http2Port, bufferSize, bodyHeadSize, se
started <- newEmptyTMVarIO
reqQ <- newTBQueueIO qSize
action <- async $
runHTTP2Server started http2Port bufferSize tlsServerParams transportConfig $ \sessionId r sendResponse -> do
runHTTP2Server started http2Port bufferSize tlsServerParams transportConfig Nothing $ \sessionId r sendResponse -> do
reqBody <- getHTTP2Body r bodyHeadSize
atomically $ writeTBQueue reqQ HTTP2Request {sessionId, request = r, reqBody, sendResponse}
void . atomically $ takeTMVar started
@@ -58,12 +63,29 @@ getHTTP2Server HTTP2ServerConfig {qSize, http2Port, bufferSize, bodyHeadSize, se
closeHTTP2Server :: HTTP2Server -> IO ()
closeHTTP2Server = uninterruptibleCancel . action
runHTTP2Server :: TMVar Bool -> ServiceName -> BufferSize -> T.ServerParams -> TransportServerConfig -> HTTP2ServerFunc -> IO ()
runHTTP2Server started port bufferSize serverParams transportConfig = runHTTP2ServerWith bufferSize setup
runHTTP2Server :: TMVar Bool -> ServiceName -> BufferSize -> T.ServerParams -> TransportServerConfig -> Maybe ExpirationConfig -> HTTP2ServerFunc -> IO ()
runHTTP2Server started port bufferSize serverParams transportConfig expCfg_ = runHTTP2ServerWith_ expCfg_ bufferSize setup
where
setup = runTransportServer started port serverParams transportConfig
runHTTP2ServerWith :: BufferSize -> ((TLS -> IO ()) -> a) -> (SessionId -> Request -> (Response -> IO ()) -> IO ()) -> a
runHTTP2ServerWith bufferSize setup http2Server = setup $ withHTTP2 bufferSize run
runHTTP2ServerWith :: BufferSize -> ((TLS -> IO ()) -> a) -> HTTP2ServerFunc -> a
runHTTP2ServerWith = runHTTP2ServerWith_ Nothing
runHTTP2ServerWith_ :: Maybe ExpirationConfig -> BufferSize -> ((TLS -> IO ()) -> a) -> HTTP2ServerFunc -> a
runHTTP2ServerWith_ expCfg_ bufferSize setup http2Server = setup $ \tls -> do
activeAt <- newTVarIO =<< getSystemTime
tid_ <- mapM (forkIO . expireInactiveClient tls activeAt) expCfg_
withHTTP2 bufferSize (run activeAt) tls `finally` mapM_ killThread tid_
where
run cfg sessId = H.run cfg $ \req _aux sendResp -> http2Server sessId req (`sendResp` [])
run activeAt cfg sessId = H.run cfg $ \req _aux sendResp -> do
getSystemTime >>= atomically . writeTVar activeAt
http2Server sessId req (`sendResp` [])
expireInactiveClient tls activeAt expCfg = loop
where
loop = do
threadDelay' $ checkInterval expCfg * 1000000
old <- expireBeforeEpoch expCfg
ts <- readTVarIO activeAt
if systemSeconds ts < old
then closeConnection tls
else loop

View File

@@ -12,7 +12,7 @@ import SMPClient (serverBracket)
import Simplex.FileTransfer.Client
import Simplex.FileTransfer.Description
import Simplex.FileTransfer.Server (runXFTPServerBlocking)
import Simplex.FileTransfer.Server.Env (XFTPServerConfig (..), defaultFileExpiration)
import Simplex.FileTransfer.Server.Env (XFTPServerConfig (..), defaultFileExpiration, defaultInactiveClientExpiration)
import Simplex.Messaging.Protocol (XFTPServer)
import Simplex.Messaging.Transport.Server
import Test.Hspec
@@ -105,6 +105,7 @@ testXFTPServerConfig =
allowNewFiles = True,
newFileBasicAuth = Nothing,
fileExpiration = Just defaultFileExpiration,
inactiveClientExpiration = Just defaultInactiveClientExpiration,
caCertificateFile = "tests/fixtures/ca.crt",
privateKeyFile = "tests/fixtures/server.key",
certificateFile = "tests/fixtures/server.crt",

View File

@@ -28,6 +28,7 @@ import qualified Simplex.Messaging.Crypto as C
import qualified Simplex.Messaging.Crypto.Lazy as LC
import Simplex.Messaging.Protocol (BasicAuth, SenderId)
import Simplex.Messaging.Server.Expiration (ExpirationConfig (..))
import Simplex.Messaging.Util (liftIOEither)
import System.Directory (createDirectoryIfMissing, removeDirectoryRecursive, removeFile)
import System.FilePath ((</>))
import Test.Hspec
@@ -49,6 +50,7 @@ xftpServerTests =
it "should acknowledge file chunk reception (2 clients)" testFileChunkAck2
it "should not allow chunks of wrong size" testWrongChunkSize
it "should expire chunks after set interval" testFileChunkExpiration
it "should disconnect inactive clients" testInactiveClientExpiration
it "should not allow uploading chunks after specified storage quota" testFileStorageQuota
it "should store file records to log and restore them after server restart" testFileLog
describe "XFTP basic auth" $ do
@@ -214,6 +216,21 @@ testFileChunkExpiration = withXFTPServerCfg testXFTPServerConfig {fileExpiration
where
fileExpiration = Just ExpirationConfig {ttl = 1, checkInterval = 1}
testInactiveClientExpiration :: Expectation
testInactiveClientExpiration = withXFTPServerCfg testXFTPServerConfig {inactiveClientExpiration} $ \_ -> runRight_ $ do
disconnected <- newEmptyTMVarIO
c <- liftIOEither $ getXFTPClient (1, testXFTPServer, Nothing) testXFTPClientConfig (\_ -> atomically $ putTMVar disconnected ())
pingXFTP c
liftIO $ do
threadDelay 100000
atomically (tryReadTMVar disconnected) `shouldReturn` Nothing
pingXFTP c
liftIO $ do
threadDelay 3000000
atomically (tryTakeTMVar disconnected) `shouldReturn` Just ()
where
inactiveClientExpiration = Just ExpirationConfig {ttl = 1, checkInterval = 1}
testFileStorageQuota :: Expectation
testFileStorageQuota = withXFTPServerCfg testXFTPServerConfig {fileSizeQuota = Just $ chSize * 2} $
\_ -> testXFTPClient $ \c -> runRight_ $ do