diff --git a/simplexmq.cabal b/simplexmq.cabal index 5f6b3f5de..fc87b23ef 100644 --- a/simplexmq.cabal +++ b/simplexmq.cabal @@ -51,6 +51,7 @@ library Simplex.FileTransfer.Description Simplex.FileTransfer.Protocol Simplex.FileTransfer.Server + Simplex.FileTransfer.Server.Control Simplex.FileTransfer.Server.Env Simplex.FileTransfer.Server.Main Simplex.FileTransfer.Server.Stats diff --git a/src/Simplex/FileTransfer/Server.hs b/src/Simplex/FileTransfer/Server.hs index c835a454b..f4b725462 100644 --- a/src/Simplex/FileTransfer/Server.hs +++ b/src/Simplex/FileTransfer/Server.hs @@ -15,7 +15,6 @@ module Simplex.FileTransfer.Server where import Control.Logger.Simple import Control.Monad import Control.Monad.Except -import Control.Monad.IO.Unlift (MonadUnliftIO) import Control.Monad.Reader import Data.Bifunctor (first) import qualified Data.ByteString.Base64.URL as B64 @@ -33,9 +32,13 @@ import Data.Time.Clock (UTCTime (..), diffTimeToPicoseconds, getCurrentTime) import Data.Time.Clock.System (SystemTime (..), getSystemTime) import Data.Time.Format.ISO8601 (iso8601Show) import Data.Word (Word32) +import GHC.IO.Handle (hSetNewlineMode) +import GHC.Stats (getRTSStats) import qualified Network.HTTP.Types as N import qualified Network.HTTP2.Server as H +import Network.Socket import Simplex.FileTransfer.Protocol +import Simplex.FileTransfer.Server.Control import Simplex.FileTransfer.Server.Env import Simplex.FileTransfer.Server.Stats import Simplex.FileTransfer.Server.Store @@ -48,17 +51,18 @@ import Simplex.Messaging.Protocol (CorrId, RcvPublicDhKey, RcvPublicVerifyKey, R import Simplex.Messaging.Server (dummyVerifyCmd, verifyCmdSignature) import Simplex.Messaging.Server.Expiration import Simplex.Messaging.Server.Stats +import Simplex.Messaging.Transport.Buffer (trimCR) import Simplex.Messaging.Transport.HTTP2 import Simplex.Messaging.Transport.HTTP2.Server +import Simplex.Messaging.Transport.Server (runTCPServer) import Simplex.Messaging.Util import System.Exit (exitFailure) import System.FilePath (()) -import System.IO (BufferMode (..), hPutStrLn, hSetBuffering) -import UnliftIO (IOMode (..), withFile) +import System.IO (hPrint, hPutStrLn, universalNewlineMode) +import UnliftIO import UnliftIO.Concurrent (threadDelay) import UnliftIO.Directory (doesFileExist, removeFile, renameFile) -import UnliftIO.Exception -import UnliftIO.STM +import qualified UnliftIO.Exception as E type M a = ReaderT XFTPEnv IO a @@ -73,7 +77,7 @@ runXFTPServerBlocking started cfg = newXFTPServerEnv cfg >>= runReaderT (xftpSer xftpServer :: XFTPServerConfig -> TMVar Bool -> M () xftpServer cfg@XFTPServerConfig {xftpPort, transportConfig, inactiveClientExpiration} started = do restoreServerStats - raceAny_ (runServer : expireFilesThread_ cfg <> serverStatsThread_ cfg) `finally` stopServer + raceAny_ (runServer : expireFilesThread_ cfg <> serverStatsThread_ cfg <> controlPortThread_ cfg) `finally` stopServer where runServer :: M () runServer = do @@ -164,6 +168,47 @@ xftpServer cfg@XFTPServerConfig {xftpPort, transportConfig, inactiveClientExpira ] liftIO $ threadDelay' interval + controlPortThread_ :: XFTPServerConfig -> [M ()] + controlPortThread_ XFTPServerConfig {controlPort = Just port} = [runCPServer port] + controlPortThread_ _ = [] + + runCPServer :: ServiceName -> M () + runCPServer port = do + cpStarted <- newEmptyTMVarIO + u <- askUnliftIO + liftIO $ do + labelMyThread "control port server" + runTCPServer cpStarted port $ runCPClient u + where + runCPClient :: UnliftIO (ReaderT XFTPEnv IO) -> Socket -> IO () + runCPClient u sock = do + labelMyThread "control port client" + h <- socketToHandle sock ReadWriteMode + hSetBuffering h LineBuffering + hSetNewlineMode h universalNewlineMode + hPutStrLn h "XFTP server control port\n'help' for supported commands" + cpLoop h + where + cpLoop h = do + s <- B.hGetLine h + case strDecode $ trimCR s of + Right CPQuit -> hClose h + Right cmd -> processCP h cmd >> cpLoop h + Left err -> hPutStrLn h ("error: " <> err) >> cpLoop h + processCP h = \case + CPStatsRTS -> E.tryAny getRTSStats >>= either (hPrint h) (hPrint h) + CPDelete fileId -> unliftIO u $ do + fs <- asks store + r <- runExceptT $ do + let asSender = ExceptT . atomically $ getFile fs SFSender fileId + let asRecipient = ExceptT . atomically $ getFile fs SFRecipient fileId + (fr, _) <- asSender `catchError` const asRecipient + ExceptT $ deleteServerFile_ fr + liftIO . hPutStrLn h $ either (\e -> "error: " <> show e) (\() -> "ok") r + CPHelp -> hPutStrLn h "commands: stats-rts, delete, help, quit" + CPQuit -> pure () + CPSkip -> pure () + data ServerFile = ServerFile { filePath :: FilePath, fileSize :: Word32, @@ -337,21 +382,7 @@ processXFTPRequest HTTP2Body {bodyPart} = \case _ -> pure (FRErr NO_FILE, Nothing) deleteServerFile :: FileRec -> M FileResponse - deleteServerFile FileRec {senderId, fileInfo, filePath} = do - withFileLog (`logDeleteFile` senderId) - r <- runExceptT $ do - path <- readTVarIO filePath - stats <- asks serverStats - ExceptT $ first (\(_ :: SomeException) -> FILE_IO) <$> try (forM_ path $ \p -> whenM (doesFileExist p) (removeFile p >> deletedStats stats)) - st <- asks store - void $ atomically $ deleteFile st senderId - atomically $ modifyTVar' (filesDeleted stats) (+ 1) - pure FROk - either (pure . FRErr) pure r - where - deletedStats stats = do - atomically $ modifyTVar' (filesCount stats) (subtract 1) - atomically $ modifyTVar' (filesSize stats) (subtract $ fromIntegral $ size fileInfo) + deleteServerFile fr = either FRErr (\() -> FROk) <$> deleteServerFile_ fr logFileError :: SomeException -> IO () logFileError e = logError $ "Error deleting file: " <> tshow e @@ -365,6 +396,21 @@ processXFTPRequest HTTP2Body {bodyPart} = \case atomically $ modifyTVar' (fileDownloadAcks stats) (+ 1) pure FROk +deleteServerFile_ :: FileRec -> M (Either XFTPErrorType ()) +deleteServerFile_ FileRec {senderId, fileInfo, filePath} = do + withFileLog (`logDeleteFile` senderId) + runExceptT $ do + path <- readTVarIO filePath + stats <- asks serverStats + ExceptT $ first (\(_ :: SomeException) -> FILE_IO) <$> try (forM_ path $ \p -> whenM (doesFileExist p) (removeFile p >> deletedStats stats)) + st <- asks store + void $ atomically $ deleteFile st senderId + atomically $ modifyTVar' (filesDeleted stats) (+ 1) + where + deletedStats stats = do + atomically $ modifyTVar' (filesCount stats) (subtract 1) + atomically $ modifyTVar' (filesSize stats) (subtract $ fromIntegral $ size fileInfo) + randomId :: (MonadUnliftIO m, MonadReader XFTPEnv m) => Int -> m ByteString randomId n = atomically . C.randomBytes n =<< asks random diff --git a/src/Simplex/FileTransfer/Server/Control.hs b/src/Simplex/FileTransfer/Server/Control.hs new file mode 100644 index 000000000..2a325e83b --- /dev/null +++ b/src/Simplex/FileTransfer/Server/Control.hs @@ -0,0 +1,31 @@ +{-# LANGUAGE LambdaCase #-} +{-# LANGUAGE OverloadedStrings #-} + +module Simplex.FileTransfer.Server.Control where + +import qualified Data.Attoparsec.ByteString.Char8 as A +import Data.ByteString (ByteString) +import Simplex.Messaging.Encoding.String + +data ControlProtocol + = CPStatsRTS + | CPDelete ByteString + | CPHelp + | CPQuit + | CPSkip + +instance StrEncoding ControlProtocol where + strEncode = \case + CPStatsRTS -> "stats-rts" + CPDelete bs -> "delete " <> strEncode bs + CPHelp -> "help" + CPQuit -> "quit" + CPSkip -> "" + strP = + A.takeTill (== ' ') >>= \case + "stats-rts" -> pure CPStatsRTS + "delete" -> CPDelete <$> (A.space *> strP) + "help" -> pure CPHelp + "quit" -> pure CPQuit + "" -> pure CPSkip + _ -> fail "bad ControlProtocol command" diff --git a/src/Simplex/FileTransfer/Server/Env.hs b/src/Simplex/FileTransfer/Server/Env.hs index bb49e3192..89763608a 100644 --- a/src/Simplex/FileTransfer/Server/Env.hs +++ b/src/Simplex/FileTransfer/Server/Env.hs @@ -33,6 +33,7 @@ import UnliftIO.STM data XFTPServerConfig = XFTPServerConfig { xftpPort :: ServiceName, + controlPort :: Maybe ServiceName, fileIdSize :: Int, storeLogFile :: Maybe FilePath, filesPath :: FilePath, diff --git a/src/Simplex/FileTransfer/Server/Main.hs b/src/Simplex/FileTransfer/Server/Main.hs index 520da2a20..e3943d489 100644 --- a/src/Simplex/FileTransfer/Server/Main.hs +++ b/src/Simplex/FileTransfer/Server/Main.hs @@ -100,6 +100,7 @@ xftpServerCLI cfgPath logPath = do <> ("host: " <> host <> "\n") <> ("port: " <> defaultServerPort <> "\n") <> "log_tls_errors: off\n\ + \# control_port: 5226\n\ \\n\ \[FILES]\n" <> ("path: " <> filesPath <> "\n") @@ -144,6 +145,7 @@ xftpServerCLI cfgPath logPath = do serverConfig = XFTPServerConfig { xftpPort = T.unpack $ strictIni "TRANSPORT" "port" ini, + controlPort = either (const Nothing) (Just . T.unpack) $ lookupValue "TRANSPORT" "control_port" ini, fileIdSize = 16, storeLogFile = enableStoreLog $> storeLogFilePath, filesPath = T.unpack $ strictIni "FILES" "path" ini, diff --git a/tests/XFTPClient.hs b/tests/XFTPClient.hs index 295aad059..312135072 100644 --- a/tests/XFTPClient.hs +++ b/tests/XFTPClient.hs @@ -97,6 +97,7 @@ testXFTPServerConfig :: XFTPServerConfig testXFTPServerConfig = XFTPServerConfig { xftpPort = xftpTestPort, + controlPort = Nothing, fileIdSize = 16, storeLogFile = Nothing, filesPath = xftpServerFiles,