From bccef0ba473d15240f46cea935e7eeaf6f9dfb53 Mon Sep 17 00:00:00 2001 From: Stanislav Dmitrenko <7953703+avently@users.noreply.github.com> Date: Mon, 13 Feb 2023 16:36:02 +0300 Subject: [PATCH] files: server and client spike - basic upload/download (#591) * Files: main, env, stats, storeLog * Better + transport * Executable * Env * Update Client.hs, Server.hs, and 4 more files... * Answer on request * Delay * Temp file * Bypass cert check * update package.yml, rename * update store log * extend HTTP2 transport * refactor caStore * HTTP2 body * update server stats * file server/client framework * verify server commands * process FNEW command, CLI test works * simple XFTP server test (fails) * fix test, refactor * upload chunk works * receive file chunk in the client * remove transport handshake * typo Co-authored-by: spaced4ndy <8711996+spaced4ndy@users.noreply.github.com> * fix names --------- Co-authored-by: Evgeny Poberezkin <2769109+epoberezkin@users.noreply.github.com> Co-authored-by: spaced4ndy <8711996+spaced4ndy@users.noreply.github.com> --- apps/xftp-server/Main.hs | 18 ++ cabal.project | 5 + package.yaml | 10 +- rfcs/2022-12-26-simplex-file-transfer.md | 5 +- simplexmq.cabal | 77 ++++- src/Simplex/FileTransfer/Client.hs | 159 +++++++++++ src/Simplex/FileTransfer/Protocol.hs | 86 +++++- src/Simplex/FileTransfer/Server.hs | 267 ++++++++++++++++++ src/Simplex/FileTransfer/Server/Env.hs | 67 +++++ src/Simplex/FileTransfer/Server/Main.hs | 178 ++++++++++++ src/Simplex/FileTransfer/Server/Stats.hs | 93 ++++++ src/Simplex/FileTransfer/Server/Store.hs | 27 +- src/Simplex/FileTransfer/Server/StoreLog.hs | 124 +++++++- src/Simplex/FileTransfer/Transport.hs | 42 +++ src/Simplex/Messaging/Encoding/String.hs | 12 +- .../Notifications/Server/Push/APNS.hs | 1 + src/Simplex/Messaging/Protocol.hs | 4 +- src/Simplex/Messaging/Server.hs | 4 +- src/Simplex/Messaging/Transport.hs | 4 +- src/Simplex/Messaging/Transport/HTTP2.hs | 17 +- .../Messaging/Transport/HTTP2/Client.hs | 34 ++- tests/AgentTests/FunctionalAPITests.hs | 2 +- tests/AgentTests/NotificationTests.hs | 3 +- tests/CLITests.hs | 27 ++ tests/NtfClient.hs | 15 +- tests/Test.hs | 5 +- tests/XFTPClient.hs | 61 ++++ tests/XFTPServerTests.hs | 57 ++++ 28 files changed, 1330 insertions(+), 74 deletions(-) create mode 100644 apps/xftp-server/Main.hs create mode 100644 src/Simplex/FileTransfer/Server/Env.hs create mode 100644 src/Simplex/FileTransfer/Server/Stats.hs create mode 100644 src/Simplex/FileTransfer/Transport.hs create mode 100644 tests/XFTPClient.hs create mode 100644 tests/XFTPServerTests.hs diff --git a/apps/xftp-server/Main.hs b/apps/xftp-server/Main.hs new file mode 100644 index 000000000..ca3528872 --- /dev/null +++ b/apps/xftp-server/Main.hs @@ -0,0 +1,18 @@ +module Main where + +import Control.Logger.Simple +import Simplex.FileTransfer.Server.Main + +cfgPath :: FilePath +cfgPath = "/etc/opt/simplex-xftp" + +logPath :: FilePath +logPath = "/var/opt/simplex-xftp" + +logCfg :: LogConfig +logCfg = LogConfig {lc_file = Nothing, lc_stderr = True} + +main :: IO () +main = do + setLogLevel LogDebug -- change to LogError in production + withGlobalLogging logCfg $ xftpServerCLI cfgPath logPath diff --git a/cabal.project b/cabal.project index 020cc61f7..9ec665d9f 100644 --- a/cabal.project +++ b/cabal.project @@ -12,6 +12,11 @@ source-repository-package location: https://github.com/simplex-chat/hs-socks.git tag: a30cc7a79a08d8108316094f8f2f82a0c5e1ac51 +source-repository-package + type: git + location: https://github.com/kazu-yamamoto/http2.git + tag: 1136bb126636789cec197e5d0bae39aa63c6f9e5 + source-repository-package type: git location: https://github.com/simplex-chat/direct-sqlcipher.git diff --git a/package.yaml b/package.yaml index 636923529..9bc66a81b 100644 --- a/package.yaml +++ b/package.yaml @@ -42,7 +42,7 @@ dependencies: - directory == 1.3.* - filepath == 1.4.* - http-types == 0.12.* - - http2 == 3.0.* + - http2 == 4.0.* - generic-random >= 1.3 && < 1.5 - ini == 0.4.1 - iso8601-time == 0.1.* @@ -104,6 +104,14 @@ executables: ghc-options: - -threaded + xftp-server: + source-dirs: apps/xftp-server + main: Main.hs + dependencies: + - simplexmq + ghc-options: + - -threaded + smp-agent: source-dirs: apps/smp-agent main: Main.hs diff --git a/rfcs/2022-12-26-simplex-file-transfer.md b/rfcs/2022-12-26-simplex-file-transfer.md index a607726fb..9ca7a99d9 100644 --- a/rfcs/2022-12-26-simplex-file-transfer.md +++ b/rfcs/2022-12-26-simplex-file-transfer.md @@ -132,16 +132,15 @@ File description format (yml): ``` name: file.ext size: 33200000 -chunk: 8Mb +chunk: 8mb hash: abc= key: abc= iv: abc= -part_hashes: [def=, def=, def=, def=] parts: - server: xftp://abc=@example1.com chunks: [1:abc=:def=:ghi=, 3:abc=:def=:ghi=] - server: xftp://abc=@example2.com - chunks: [2:abc=:def=:ghi=, 4:abc=:def=:ghi=] + chunks: [2:abc=:def=:ghi=, 4:abc=:def=:ghi=:2mb] - server: xftp://abc=@example3.com chunks: [1:abc=:def=, 4:abc=:def=] - server: xftp://abc=@example4.com diff --git a/simplexmq.cabal b/simplexmq.cabal index 386ac9f9c..2611610c1 100644 --- a/simplexmq.cabal +++ b/simplexmq.cabal @@ -40,9 +40,12 @@ library Simplex.FileTransfer.Description Simplex.FileTransfer.Protocol Simplex.FileTransfer.Server + Simplex.FileTransfer.Server.Env Simplex.FileTransfer.Server.Main + Simplex.FileTransfer.Server.Stats Simplex.FileTransfer.Server.Store Simplex.FileTransfer.Server.StoreLog + Simplex.FileTransfer.Transport Simplex.Messaging.Agent Simplex.Messaging.Agent.Client Simplex.Messaging.Agent.Env.SQLite @@ -139,7 +142,7 @@ library , filepath ==1.4.* , generic-random >=1.3 && <1.5 , http-types ==0.12.* - , http2 ==3.0.* + , http2 ==4.0.* , ini ==0.4.1 , iso8601-time ==0.1.* , memory ==0.15.* @@ -201,7 +204,7 @@ executable ntf-server , filepath ==1.4.* , generic-random >=1.3 && <1.5 , http-types ==0.12.* - , http2 ==3.0.* + , http2 ==4.0.* , ini ==0.4.1 , iso8601-time ==0.1.* , memory ==0.15.* @@ -264,7 +267,7 @@ executable smp-agent , filepath ==1.4.* , generic-random >=1.3 && <1.5 , http-types ==0.12.* - , http2 ==3.0.* + , http2 ==4.0.* , ini ==0.4.1 , iso8601-time ==0.1.* , memory ==0.15.* @@ -327,7 +330,7 @@ executable smp-server , filepath ==1.4.* , generic-random >=1.3 && <1.5 , http-types ==0.12.* - , http2 ==3.0.* + , http2 ==4.0.* , ini ==0.4.1 , iso8601-time ==0.1.* , memory ==0.15.* @@ -360,6 +363,68 @@ executable smp-server if flag(swift) cpp-options: -DswiftJSON +executable xftp-server + main-is: Main.hs + other-modules: + Paths_simplexmq + hs-source-dirs: + apps/xftp-server + ghc-options: -Wall -Wcompat -Werror=incomplete-patterns -Wredundant-constraints -Wincomplete-record-updates -Wincomplete-uni-patterns -Wunused-type-patterns -threaded + build-depends: + QuickCheck ==2.14.* + , aeson ==2.0.* + , ansi-terminal >=0.10 && <0.12 + , asn1-encoding ==0.9.* + , asn1-types ==0.3.* + , async ==2.2.* + , attoparsec ==0.14.* + , base >=4.14 && <5 + , base64-bytestring >=1.0 && <1.3 + , bytestring ==0.10.* + , case-insensitive ==1.2.* + , composition ==1.0.* + , constraints >=0.12 && <0.14 + , containers ==0.6.* + , cryptonite >=0.27 && <0.30 + , cryptostore ==0.2.* + , data-default ==0.7.* + , direct-sqlcipher ==2.3.* + , directory ==1.3.* + , filepath ==1.4.* + , generic-random >=1.3 && <1.5 + , http-types ==0.12.* + , http2 ==4.0.* + , ini ==0.4.1 + , iso8601-time ==0.1.* + , memory ==0.15.* + , mtl ==2.2.* + , network >=3.1.2.7 && <3.2 + , network-transport ==0.5.4 + , optparse-applicative >=0.15 && <0.17 + , process ==1.6.* + , random >=1.1 && <1.3 + , simple-logger ==0.1.* + , simplexmq + , socks ==0.6.* + , sqlcipher-simple ==0.4.* + , stm ==2.5.* + , template-haskell ==2.16.* + , text ==1.2.* + , time ==1.9.* + , time-compat ==1.9.* + , time-manager ==0.0.* + , tls >=1.6.0 && <1.7 + , transformers ==0.5.* + , unliftio ==0.2.* + , unliftio-core ==0.2.* + , websockets ==0.12.* + , x509 ==1.7.* + , x509-store ==1.6.* + , x509-validation ==1.6.* + default-language: Haskell2010 + if flag(swift) + cpp-options: -DswiftJSON + test-suite smp-server-test type: exitcode-stdio-1.0 main-is: Test.hs @@ -383,6 +448,8 @@ test-suite smp-server-test ServerTests SMPAgentClient SMPClient + XFTPClient + XFTPServerTests Paths_simplexmq hs-source-dirs: tests @@ -413,7 +480,7 @@ test-suite smp-server-test , hspec ==2.7.* , hspec-core ==2.7.* , http-types ==0.12.* - , http2 ==3.0.* + , http2 ==4.0.* , ini ==0.4.1 , iso8601-time ==0.1.* , main-tester ==0.2.* diff --git a/src/Simplex/FileTransfer/Client.hs b/src/Simplex/FileTransfer/Client.hs index b4ca6fa86..c969137fa 100644 --- a/src/Simplex/FileTransfer/Client.hs +++ b/src/Simplex/FileTransfer/Client.hs @@ -1 +1,160 @@ +{-# LANGUAGE BlockArguments #-} +{-# LANGUAGE DataKinds #-} +{-# LANGUAGE DuplicateRecordFields #-} +{-# LANGUAGE LambdaCase #-} +{-# LANGUAGE NamedFieldPuns #-} +{-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE ScopedTypeVariables #-} +{-# LANGUAGE TypeApplications #-} + module Simplex.FileTransfer.Client where + +import Control.Monad.Except +import Data.Bifunctor (first) +import Data.ByteString.Builder (Builder, byteString) +import Data.ByteString.Char8 (ByteString) +import qualified Data.ByteString.Char8 as B +import Data.Int (Int64) +import Data.List.NonEmpty (NonEmpty (..)) +import qualified Network.HTTP.Types as N +import qualified Network.HTTP2.Client as H +import Simplex.FileTransfer.Protocol +import Simplex.FileTransfer.Transport (receiveFile, sendFile) +import Simplex.Messaging.Client + ( NetworkConfig (..), + ProtocolClientError (..), + TransportSession, + chooseTransportHost, + defaultNetworkConfig, + proxyUsername, + transportClientConfig, + ) +import qualified Simplex.Messaging.Crypto as C +import Simplex.Messaging.Protocol + ( ErrorType (..), + ProtocolServer (ProtocolServer), + RcvPublicDhKey, + RecipientId, + SenderId, + ) +import Simplex.Messaging.Transport (supportedParameters) +import Simplex.Messaging.Transport.Client (TransportClientConfig) +import Simplex.Messaging.Transport.HTTP2 +import Simplex.Messaging.Transport.HTTP2.Client +import Simplex.Messaging.Util (bshow, liftEitherError) +import System.IO (IOMode (..), SeekMode (..)) +import UnliftIO.IO (hSeek, withFile) + +data XFTPClient = XFTPClient + { http2Client :: HTTP2Client, + config :: XFTPClientConfig + } + +data XFTPClientConfig = XFTPClientConfig + { networkConfig :: NetworkConfig + } + +data XFTPChunkBody = XFTPChunkBody + { chunkSize :: Int, + chunkPart :: Int -> IO ByteString, + http2Body :: HTTP2Body + } + +data XFTPChunkSpec = XFTPChunkSpec + { filePath :: FilePath, + chunkOffset :: Int64, + chunkSize :: Int + } + +defaultXFTPClientConfig :: XFTPClientConfig +defaultXFTPClientConfig = XFTPClientConfig {networkConfig = defaultNetworkConfig} + +getXFTPClient :: TransportSession FileResponse -> XFTPClientConfig -> IO () -> IO (Either ProtocolClientError XFTPClient) +getXFTPClient transportSession@(_, srv, _) config@XFTPClientConfig {networkConfig} disconnected = runExceptT $ do + let tcConfig = transportClientConfig networkConfig + http2Config = xftpHTTP2Config tcConfig config + username = proxyUsername transportSession + ProtocolServer _ host port keyHash = srv + useHost <- liftEither $ chooseTransportHost networkConfig host + http2Client <- liftEitherError xftpClientError $ getVerifiedHTTP2Client (Just username) useHost port (Just keyHash) Nothing http2Config disconnected + pure XFTPClient {http2Client, config} + +xftpHTTP2Config :: TransportClientConfig -> XFTPClientConfig -> HTTP2ClientConfig +xftpHTTP2Config transportConfig XFTPClientConfig {networkConfig = NetworkConfig {tcpConnectTimeout}} = + defaultHTTP2ClientConfig + { bodyHeadSize = xftpBlockSize, + suportedTLSParams = supportedParameters, + connTimeout = tcpConnectTimeout, + transportConfig + } + +xftpClientError :: HTTP2ClientError -> ProtocolClientError +xftpClientError = \case + HCResponseTimeout -> PCEResponseTimeout + HCNetworkError -> PCENetworkError + HCIOError e -> PCEIOError e + +sendXFTPCommand :: forall p. FilePartyI p => XFTPClient -> C.APrivateSignKey -> XFTPFileId -> FileCommand p -> Maybe XFTPChunkSpec -> ExceptT ProtocolClientError IO (FileResponse, HTTP2Body) +sendXFTPCommand XFTPClient {http2Client = http2@HTTP2Client {sessionId}} pKey fId cmd chunkSpec_ = do + t <- + liftEither . first PCETransportError $ + xftpEncodeTransmission sessionId (Just pKey) ("", fId, FileCmd (sFileParty @p) cmd) + let req = H.requestStreaming N.methodPost "/" [] $ streamBody t + HTTP2Response {respBody = body@HTTP2Body {bodyHead}} <- liftEitherError xftpClientError $ sendRequestDirect http2 req + when (B.length bodyHead /= xftpBlockSize) $ throwError $ PCEResponseError BLOCK + -- TODO validate that the file ID is the same as in the request? + (_, _, (_, _fId, respOrErr)) <- liftEither . first PCEResponseError $ xftpDecodeTransmission sessionId bodyHead + case respOrErr of + Right r -> pure (r, body) + Left e -> throwError $ PCEResponseError e + where + streamBody :: ByteString -> (Builder -> IO ()) -> IO () -> IO () + streamBody t send done = do + send $ byteString t + forM_ chunkSpec_ $ \XFTPChunkSpec {filePath, chunkOffset, chunkSize} -> + withFile filePath ReadMode $ \h -> do + hSeek h AbsoluteSeek $ fromIntegral chunkOffset + sendFile h send chunkSize + done + +createXFTPChunk :: + XFTPClient -> + C.APrivateSignKey -> + FileInfo -> + NonEmpty C.APublicVerifyKey -> + ExceptT ProtocolClientError IO (SenderId, NonEmpty RecipientId) +createXFTPChunk c spKey file rsps = + sendXFTPCommand c spKey "" (FNEW file rsps) Nothing >>= \case + -- TODO check that body is empty + (FRSndIds sId rIds, _body) -> pure (sId, rIds) + (r, _) -> throwError . PCEUnexpectedResponse $ bshow r + +uploadXFTPChunk :: XFTPClient -> C.APrivateSignKey -> XFTPFileId -> XFTPChunkSpec -> ExceptT ProtocolClientError IO () +uploadXFTPChunk c spKey fId chunkSpec = + sendXFTPCommand c spKey fId FPUT (Just chunkSpec) >>= \case + -- TODO check that body is empty + (FROk, _body) -> pure () + (r, _) -> throwError . PCEUnexpectedResponse $ bshow r + +downloadXFTPChunk :: XFTPClient -> C.APrivateSignKey -> XFTPFileId -> RcvPublicDhKey -> ExceptT ProtocolClientError IO (RcvPublicDhKey, XFTPChunkBody) +downloadXFTPChunk c rpKey fId rKey = + sendXFTPCommand c rpKey fId (FGET rKey) Nothing >>= \case + (FRFile sKey, http2Body@HTTP2Body {bodyHead, bodySize, bodyPart}) -> case bodyPart of + -- TODO atm bodySize is set to 0, so chunkSize will be incorrect + Just chunkPart -> do + let chunk = XFTPChunkBody {chunkSize = bodySize - B.length bodyHead, chunkPart, http2Body} + pure (sKey, chunk) + _ -> throwError $ PCEResponseError NO_MSG + (r, _) -> throwError . PCEUnexpectedResponse $ bshow r + +receiveXFTPChunk :: XFTPChunkBody -> XFTPChunkSpec -> ExceptT ProtocolClientError IO () +receiveXFTPChunk XFTPChunkBody {chunkPart} XFTPChunkSpec {filePath, chunkOffset, chunkSize} = liftIO $ do + withFile filePath WriteMode $ \h -> do + hSeek h AbsoluteSeek $ fromIntegral chunkOffset + -- TODO chunk decryption + void $ receiveFile h chunkPart 0 + +-- FADD :: NonEmpty RcvPublicVerifyKey -> FileCommand Sender +-- FDEL :: FileCommand Sender +-- FACK :: FileCommand Recipient +-- PING :: FileCommand Recipient diff --git a/src/Simplex/FileTransfer/Protocol.hs b/src/Simplex/FileTransfer/Protocol.hs index b26fc5fdf..36369d09f 100644 --- a/src/Simplex/FileTransfer/Protocol.hs +++ b/src/Simplex/FileTransfer/Protocol.hs @@ -11,14 +11,17 @@ module Simplex.FileTransfer.Protocol where +import Data.Bifunctor (first) import Data.ByteString.Char8 (ByteString) import qualified Data.ByteString.Char8 as B import Data.Kind (Type) -import Data.List.NonEmpty (NonEmpty) -import Data.Maybe (isJust, isNothing) +import Data.List.NonEmpty (NonEmpty (..)) +import Data.Maybe (isNothing) import Data.Type.Equality import Data.Word (Word32) +import qualified Simplex.Messaging.Crypto as C import Simplex.Messaging.Encoding +import Simplex.Messaging.Encoding.String import Simplex.Messaging.Notifications.Transport (ntfClientHandshake) import Simplex.Messaging.Protocol ( CommandError (..), @@ -29,11 +32,29 @@ import Simplex.Messaging.Protocol ProtocolType (..), RcvPublicDhKey, RcvPublicVerifyKey, + RecipientId, + SenderId, + SentRawTransmission, + SignedTransmission, SndPublicVerifyKey, + Transmission, + encodeTransmission, messageTagP, + tDecodeParseValidate, + tEncode, + tEncodeBatch, + tParse, _smpP, ) +import Simplex.Messaging.Transport (SessionId, TransportError (..)) import Simplex.Messaging.Util ((<$?>)) +import Simplex.Messaging.Version + +currentXFTPVersion :: Version +currentXFTPVersion = 1 + +xftpBlockSize :: Int +xftpBlockSize = 16384 -- | File protocol clients data FileParty = Recipient | Sender @@ -129,6 +150,8 @@ data FileInfo = FileInfo } deriving (Eq, Show) +type XFTPFileId = ByteString + instance FilePartyI p => ProtocolEncoding (FileCommand p) where type Tag (FileCommand p) = FileCommandTag p encodeProtocol _v = \case @@ -145,14 +168,18 @@ instance FilePartyI p => ProtocolEncoding (FileCommand p) where protocolP v tag = (\(FileCmd _ c) -> checkParty c) <$?> protocolP v (FCT (sFileParty @p) tag) - checkCredentials (sig, _, chunkId, _) cmd = case cmd of + checkCredentials (sig, _, fileId, _) cmd = case cmd of -- FNEW must not have signature and chunk ID FNEW {} - | isJust sig || not (B.null chunkId) -> Left $ CMD HAS_AUTH + | isNothing sig -> Left $ CMD NO_AUTH + | not (B.null fileId) -> Left $ CMD HAS_AUTH | otherwise -> Right cmd + PING + | isNothing sig && B.null fileId -> Right cmd + | otherwise -> Left $ CMD HAS_AUTH -- other client commands must have both signature and queue ID _ - | isNothing sig || B.null chunkId -> Left $ CMD NO_AUTH + | isNothing sig || B.null fileId -> Left $ CMD NO_AUTH | otherwise -> Right cmd instance ProtocolEncoding FileCmd where @@ -178,9 +205,14 @@ instance Encoding FileInfo where smpEncode FileInfo {sndKey, size, digest} = smpEncode (sndKey, size, digest) smpP = FileInfo <$> smpP <*> smpP <*> smpP +instance StrEncoding FileInfo where + strEncode FileInfo {sndKey, size, digest} = strEncode (sndKey, size, digest) + strP = FileInfo <$> strP_ <*> strP_ <*> strP + data FileResponseTag - = FRChunkIds_ + = FRSndIds_ | FRRcvIds_ + | FRFile_ | FROk_ | FRErr_ | FRPong_ @@ -188,8 +220,9 @@ data FileResponseTag instance Encoding FileResponseTag where smpEncode = \case - FRChunkIds_ -> "CHUNK" + FRSndIds_ -> "SIDS" FRRcvIds_ -> "RIDS" + FRFile_ -> "FILE" FROk_ -> "OK" FRErr_ -> "ERR" FRPong_ -> "PONG" @@ -197,16 +230,18 @@ instance Encoding FileResponseTag where instance ProtocolMsgTag FileResponseTag where decodeTag = \case - "CHUNK" -> Just FRChunkIds_ + "SIDS" -> Just FRSndIds_ "RIDS" -> Just FRRcvIds_ + "FILE" -> Just FRFile_ "OK" -> Just FROk_ "ERR" -> Just FRErr_ "PONG" -> Just FRPong_ _ -> Nothing data FileResponse - = FRChunkIds FileChunkId (NonEmpty FileChunkId) - | FRRcvIds (NonEmpty FileChunkId) + = FRSndIds SenderId (NonEmpty RecipientId) + | FRRcvIds (NonEmpty RecipientId) + | FRFile RcvPublicDhKey | FROk | FRErr ErrorType | FRPong @@ -215,8 +250,9 @@ data FileResponse instance ProtocolEncoding FileResponse where type Tag FileResponse = FileResponseTag encodeProtocol _v = \case - FRChunkIds chId rIds -> e (FRChunkIds_, ' ', chId, rIds) + FRSndIds fId rIds -> e (FRSndIds_, ' ', fId, rIds) FRRcvIds rIds -> e (FRRcvIds_, ' ', rIds) + FRFile rKey -> e (FRFile_, ' ', rKey) FROk -> e FROk_ FRErr err -> e (FRErr_, ' ', err) FRPong -> e FRPong_ @@ -225,14 +261,15 @@ instance ProtocolEncoding FileResponse where e = smpEncode protocolP _v = \case - FRChunkIds_ -> FRChunkIds <$> _smpP <*> smpP + FRSndIds_ -> FRSndIds <$> _smpP <*> smpP FRRcvIds_ -> FRRcvIds <$> _smpP + FRFile_ -> FRFile <$> _smpP FROk_ -> pure FROk FRErr_ -> FRErr <$> _smpP FRPong_ -> pure FRPong checkCredentials (_, _, entId, _) cmd = case cmd of - FRChunkIds {} -> noEntity + FRSndIds {} -> noEntity -- ERR response does not always have entity ID FRErr _ -> Right cmd -- PONG response must not have queue ID @@ -246,8 +283,6 @@ instance ProtocolEncoding FileResponse where | B.null entId = Right cmd | otherwise = Left $ CMD HAS_AUTH -type FileChunkId = ByteString - checkParty :: forall t p p'. (FilePartyI p, FilePartyI p') => t p' -> Either String (t p) checkParty c = case testEquality (sFileParty @p) (sFileParty @p') of Just Refl -> Right c @@ -257,3 +292,24 @@ checkParty' :: forall t p p'. (FilePartyI p, FilePartyI p') => t p' -> Maybe (t checkParty' c = case testEquality (sFileParty @p) (sFileParty @p') of Just Refl -> Just c _ -> Nothing + +xftpEncodeTransmission :: ProtocolEncoding c => SessionId -> Maybe C.APrivateSignKey -> Transmission c -> Either TransportError ByteString +xftpEncodeTransmission sessionId pKey (corrId, fId, msg) = do + let t = encodeTransmission currentXFTPVersion sessionId (corrId, fId, msg) + xftpEncodeBatch1 $ signTransmission t + where + signTransmission :: ByteString -> SentRawTransmission + signTransmission t = ((`C.sign` t) <$> pKey, t) + +-- this function uses batch syntax but puts only one transmission in the batch +xftpEncodeBatch1 :: (Maybe C.ASignature, ByteString) -> Either TransportError ByteString +xftpEncodeBatch1 (sig, t) = + let t' = tEncodeBatch 1 . smpEncode . Large $ tEncode (sig, t) + in first (const TELargeMsg) $ C.pad t' xftpBlockSize + +xftpDecodeTransmission :: ProtocolEncoding c => SessionId -> ByteString -> Either ErrorType (SignedTransmission c) +xftpDecodeTransmission sessionId t = do + t' <- first (const LARGE_MSG) $ C.unPad t + case tParse True t' of + t'' :| [] -> Right $ tDecodeParseValidate sessionId currentXFTPVersion t'' + _ -> Left BLOCK diff --git a/src/Simplex/FileTransfer/Server.hs b/src/Simplex/FileTransfer/Server.hs index a44e6c636..26ba37ae5 100644 --- a/src/Simplex/FileTransfer/Server.hs +++ b/src/Simplex/FileTransfer/Server.hs @@ -1 +1,268 @@ +{-# LANGUAGE DataKinds #-} +{-# LANGUAGE DuplicateRecordFields #-} +{-# LANGUAGE FlexibleContexts #-} +{-# LANGUAGE GADTs #-} +{-# LANGUAGE LambdaCase #-} +{-# LANGUAGE NamedFieldPuns #-} +{-# LANGUAGE NumericUnderscores #-} +{-# LANGUAGE OverloadedLists #-} +{-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE ScopedTypeVariables #-} +{-# LANGUAGE TupleSections #-} + module Simplex.FileTransfer.Server where + +import Control.Logger.Simple +import Control.Monad.Except +import Control.Monad.IO.Unlift (MonadUnliftIO) +import Control.Monad.Reader +import Crypto.Random (getRandomBytes) +import qualified Data.ByteString.Base64.URL as B64 +import Data.ByteString.Builder (byteString) +import Data.ByteString.Char8 (ByteString) +import qualified Data.ByteString.Char8 as B +import Data.Functor (($>)) +import Data.List (intercalate) +import qualified Data.List.NonEmpty as L +import qualified Data.Text as T +import Data.Time.Clock (UTCTime (..), diffTimeToPicoseconds, getCurrentTime) +import Data.Time.Format.ISO8601 (iso8601Show) +import Data.Word (Word32) +import qualified Network.HTTP.Types as N +import qualified Network.HTTP2.Server as H +import Simplex.FileTransfer.Protocol +import Simplex.FileTransfer.Server.Env +import Simplex.FileTransfer.Server.Stats +import Simplex.FileTransfer.Server.Store +import Simplex.FileTransfer.Transport (receiveFile, sendFile) +import qualified Simplex.Messaging.Crypto as C +import Simplex.Messaging.Encoding.String +import Simplex.Messaging.Protocol (CorrId, ErrorType (..), RcvPublicDhKey) +import Simplex.Messaging.Server (dummyVerifyCmd, verifyCmdSignature) +import Simplex.Messaging.Server.Stats +import Simplex.Messaging.Server.StoreLog (StoreLog, closeStoreLog) +import Simplex.Messaging.Transport.HTTP2 +import Simplex.Messaging.Transport.HTTP2.Server +import Simplex.Messaging.Util +import System.Exit (exitFailure) +import System.FilePath (()) +import System.IO (BufferMode (..), hPutStrLn, hSetBuffering) +import UnliftIO (IOMode (..), withFile) +import UnliftIO.Concurrent (threadDelay) +import UnliftIO.Directory (doesFileExist, removeFile, renameFile) +import UnliftIO.Exception +import UnliftIO.STM + +type M a = ReaderT XFTPEnv IO a + +runXFTPServer :: XFTPServerConfig -> IO () +runXFTPServer cfg = do + started <- newEmptyTMVarIO + runXFTPServerBlocking started cfg + +runXFTPServerBlocking :: TMVar Bool -> XFTPServerConfig -> IO () +runXFTPServerBlocking started cfg = newXFTPServerEnv cfg >>= runReaderT (xftpServer cfg started) + +xftpServer :: XFTPServerConfig -> TMVar Bool -> M () +xftpServer cfg@XFTPServerConfig {xftpPort, logTLSErrors} started = do + restoreServerStats + raceAny_ (runServer : serverStatsThread_ cfg) `finally` stopServer + where + runServer :: M () + runServer = do + serverParams <- asks tlsServerParams + env <- ask + liftIO $ + runHTTP2Server started xftpPort defaultHTTP2BufferSize serverParams logTLSErrors $ \sessionId r sendResponse -> do + reqBody <- getHTTP2Body r xftpBlockSize + processRequest HTTP2Request {sessionId, request = r, reqBody, sendResponse} `runReaderT` env + + stopServer :: M () + stopServer = do + withFileLog closeStoreLog + saveServerStats + + serverStatsThread_ :: XFTPServerConfig -> [M ()] + serverStatsThread_ XFTPServerConfig {logStatsInterval = Just interval, logStatsStartTime, serverStatsLogFile} = + [logServerStats logStatsStartTime interval serverStatsLogFile] + serverStatsThread_ _ = [] + + logServerStats :: Int -> Int -> FilePath -> M () + logServerStats startAt logInterval statsFilePath = do + initialDelay <- (startAt -) . fromIntegral . (`div` 1000000_000000) . diffTimeToPicoseconds . utctDayTime <$> liftIO getCurrentTime + liftIO $ putStrLn $ "server stats log enabled: " <> statsFilePath + threadDelay $ 1_000_000 * (initialDelay + if initialDelay < 0 then 86_400 else 0) + FileServerStats {fromTime, filesCreated, fileRecipients, filesUploaded, filesDeleted, filesDownloaded, fileDownloads, fileDownloadAcks} <- asks serverStats + let interval = 1_000_000 * logInterval + forever $ do + withFile statsFilePath AppendMode $ \h -> liftIO $ do + hSetBuffering h LineBuffering + ts <- getCurrentTime + fromTime' <- atomically $ swapTVar fromTime ts + filesCreated' <- atomically $ swapTVar filesCreated 0 + fileRecipients' <- atomically $ swapTVar fileRecipients 0 + filesUploaded' <- atomically $ swapTVar filesUploaded 0 + filesDeleted' <- atomically $ swapTVar filesDeleted 0 + files <- atomically $ periodStatCounts filesDownloaded ts + fileDownloads' <- atomically $ swapTVar fileDownloads 0 + fileDownloadAcks' <- atomically $ swapTVar fileDownloadAcks 0 + hPutStrLn h $ + intercalate + "," + [ iso8601Show $ utctDay fromTime', + show filesCreated', + show fileRecipients', + show filesUploaded', + show filesDeleted', + dayCount files, + weekCount files, + monthCount files, + show fileDownloads', + show fileDownloadAcks' + ] + threadDelay interval + +-- TODO add client DH secret +data ServerFile = ServerFile + { filePath :: FilePath, + fileSize :: Word32, + fileDhSecret :: C.DhSecretX25519 + } + +processRequest :: HTTP2Request -> M () +processRequest HTTP2Request {sessionId, reqBody = body@HTTP2Body {bodyHead}, sendResponse} + | B.length bodyHead /= xftpBlockSize = sendXFTPResponse ("", "", FRErr BLOCK) Nothing + | otherwise = do + case xftpDecodeTransmission sessionId bodyHead of + Right (sig_, signed, (corrId, fId, cmdOrErr)) -> do + case cmdOrErr of + Right cmd -> do + verifyXFTPTransmission sig_ signed fId cmd >>= \case + VRVerified req -> uncurry send =<< processXFTPRequest body req + VRFailed -> send (FRErr AUTH) Nothing + Left e -> send (FRErr e) Nothing + where + send resp = sendXFTPResponse (corrId, fId, resp) + Left e -> sendXFTPResponse ("", "", FRErr e) Nothing + where + sendXFTPResponse :: (CorrId, XFTPFileId, FileResponse) -> Maybe ServerFile -> M () + sendXFTPResponse (corrId, fId, resp) serverFile_ = do + -- liftIO . sendResponse . H.responseBuilder N.ok200 [] . byteString $ + let t_ = xftpEncodeTransmission sessionId Nothing (corrId, fId, resp) + liftIO $ sendResponse $ H.responseStreaming N.ok200 [] $ streamBody t_ + where + streamBody t_ send done = do + case t_ of + Left _ -> send "padding error" -- TODO respond with BLOCK error? + Right t -> do + send $ byteString t + -- TODO chunk encryption + forM_ serverFile_ $ \ServerFile {filePath, fileSize, fileDhSecret} -> do + withFile filePath ReadMode $ \h -> sendFile h send $ fromIntegral fileSize + done + +data VerificationResult = VRVerified XFTPRequest | VRFailed + +verifyXFTPTransmission :: Maybe C.ASignature -> ByteString -> XFTPFileId -> FileCmd -> M VerificationResult +verifyXFTPTransmission sig_ signed fId cmd = + case cmd of + FileCmd SSender (FNEW file rcps) -> pure $ XFTPReqNew file rcps `verifyWith` sndKey file + FileCmd SRecipient PING -> pure $ VRVerified XFTPReqPing + FileCmd party _ -> verifyCmd party + where + verifyCmd :: SFileParty p -> M VerificationResult + verifyCmd party = do + st <- asks store + atomically $ verify <$> getFile st party fId + where + verify = \case + Right (fr, k) -> XFTPReqCmd fr cmd `verifyWith` k + _ -> maybe False (dummyVerifyCmd signed) sig_ `seq` VRFailed + req `verifyWith` k = if verifyCmdSignature sig_ signed k then VRVerified req else VRFailed + +processXFTPRequest :: HTTP2Body -> XFTPRequest -> M (FileResponse, Maybe ServerFile) +processXFTPRequest HTTP2Body {bodyPart} = \case + XFTPReqNew file rcps -> do + st <- asks store + -- TODO validate body empty + -- TODO retry on duplicate IDs? + sId <- getFileId + rIds <- mapM (const getFileId) rcps + r <- runExceptT $ do + ExceptT $ atomically $ addFile st sId file + forM (L.zip rIds rcps) $ \rcp -> + ExceptT $ atomically $ addRecipient st sId rcp + noFile $ either FRErr (const $ FRSndIds sId rIds) r + XFTPReqCmd fr (FileCmd _ cmd) -> case cmd of + FADD _rcps -> noFile FROk + FPUT -> (,Nothing) <$> receiveServerFile fr + FDEL -> noFile FROk + FGET dhKey -> sendServerFile fr dhKey + FACK -> noFile FROk + -- it should never get to the options below, they are passed in other constructors of XFTPRequest + FNEW _ _ -> noFile $ FRErr INTERNAL + PING -> noFile FRPong + XFTPReqPing -> noFile FRPong + where + noFile resp = pure (resp, Nothing) + receiveServerFile :: FileRec -> M FileResponse + receiveServerFile FileRec {senderId, fileInfo, filePath} = case bodyPart of + Nothing -> pure $ FRErr QUOTA -- TODO file specific errors? + Just getBody -> do + -- TODO validate body size before downloading, once it's populated + path <- asks $ filesPath . config + let fPath = path B.unpack (B64.encode senderId) + FileInfo {size, digest} = fileInfo + size' <- liftIO . withFile fPath WriteMode $ \h -> receiveFile h getBody 0 + if size' == fromIntegral size -- TODO check digest + then atomically $ writeTVar filePath (Just fPath) $> FROk + else whenM (doesFileExist fPath) (removeFile fPath) $> FRErr QUOTA + sendServerFile :: FileRec -> RcvPublicDhKey -> M (FileResponse, Maybe ServerFile) + sendServerFile FileRec {filePath, fileInfo = FileInfo {size}} rKey = do + readTVarIO filePath >>= \case + Just path -> do + (sKey, spKey) <- liftIO C.generateKeyPair' + let fileDhSecret = C.dh' rKey spKey + pure (FRFile sKey, Just ServerFile {filePath = path, fileSize = size, fileDhSecret}) + _ -> pure (FRErr AUTH, Nothing) -- TODO file-specific errors? + +randomId :: (MonadUnliftIO m, MonadReader XFTPEnv m) => Int -> m ByteString +randomId n = do + gVar <- asks idsDrg + atomically (C.pseudoRandomBytes n gVar) + +getFileId :: M XFTPFileId +getFileId = liftIO . getRandomBytes =<< asks (fileIdSize . config) + +withFileLog :: (StoreLog 'WriteMode -> IO a) -> M () +withFileLog action = liftIO . mapM_ action =<< asks storeLog + +incFileStat :: (FileServerStats -> TVar Int) -> M () +incFileStat statSel = do + stats <- asks serverStats + atomically $ modifyTVar (statSel stats) (+ 1) + +saveServerStats :: M () +saveServerStats = + asks (serverStatsBackupFile . config) + >>= mapM_ (\f -> asks serverStats >>= atomically . getFileServerStatsData >>= liftIO . saveStats f) + where + saveStats f stats = do + logInfo $ "saving server stats to file " <> T.pack f + B.writeFile f $ strEncode stats + logInfo "server stats saved" + +restoreServerStats :: M () +restoreServerStats = asks (serverStatsBackupFile . config) >>= mapM_ restoreStats + where + restoreStats f = whenM (doesFileExist f) $ do + logInfo $ "restoring server stats from file " <> T.pack f + liftIO (strDecode <$> B.readFile f) >>= \case + Right d -> do + s <- asks serverStats + atomically $ setFileServerStats s d + renameFile f $ f <> ".bak" + logInfo "server stats restored" + Left e -> do + logInfo $ "error restoring server stats: " <> T.pack e + liftIO exitFailure diff --git a/src/Simplex/FileTransfer/Server/Env.hs b/src/Simplex/FileTransfer/Server/Env.hs new file mode 100644 index 000000000..6a7edd36f --- /dev/null +++ b/src/Simplex/FileTransfer/Server/Env.hs @@ -0,0 +1,67 @@ +{-# LANGUAGE DataKinds #-} +{-# LANGUAGE DuplicateRecordFields #-} +{-# LANGUAGE GADTs #-} +{-# LANGUAGE KindSignatures #-} +{-# LANGUAGE NamedFieldPuns #-} +{-# LANGUAGE StrictData #-} + +module Simplex.FileTransfer.Server.Env where + +import Control.Monad.IO.Unlift +import Crypto.Random +import Data.List.NonEmpty (NonEmpty) +import Data.Time.Clock (getCurrentTime) +import Data.X509.Validation (Fingerprint (..)) +import Network.Socket +import qualified Network.TLS as T +import Simplex.FileTransfer.Protocol (FileCmd, FileInfo) +import Simplex.FileTransfer.Server.Stats +import Simplex.FileTransfer.Server.Store +import Simplex.FileTransfer.Server.StoreLog +import qualified Simplex.Messaging.Crypto as C +import Simplex.Messaging.Protocol (RcvPublicVerifyKey) +import Simplex.Messaging.Transport.Server (loadFingerprint, loadTLSServerParams) +import System.IO (IOMode (..)) +import UnliftIO.STM + +data XFTPServerConfig = XFTPServerConfig + { xftpPort :: ServiceName, + fileIdSize :: Int, + storeLogFile :: Maybe FilePath, + filesPath :: FilePath, + -- CA certificate private key is not needed for initialization + caCertificateFile :: FilePath, + privateKeyFile :: FilePath, + certificateFile :: FilePath, + -- stats config - see SMP server config + logStatsInterval :: Maybe Int, + logStatsStartTime :: Int, + serverStatsLogFile :: FilePath, + serverStatsBackupFile :: Maybe FilePath, + logTLSErrors :: Bool + } + +data XFTPEnv = XFTPEnv + { config :: XFTPServerConfig, + store :: FileStore, + storeLog :: Maybe (StoreLog 'WriteMode), + idsDrg :: TVar ChaChaDRG, + serverIdentity :: C.KeyHash, + tlsServerParams :: T.ServerParams, + serverStats :: FileServerStats + } + +newXFTPServerEnv :: (MonadUnliftIO m, MonadRandom m) => XFTPServerConfig -> m XFTPEnv +newXFTPServerEnv config@XFTPServerConfig {storeLogFile, caCertificateFile, certificateFile, privateKeyFile} = do + idsDrg <- drgNew >>= newTVarIO + store <- atomically newFileStore + storeLog <- liftIO $ mapM (`readWriteFileStore` store) storeLogFile + tlsServerParams <- liftIO $ loadTLSServerParams caCertificateFile certificateFile privateKeyFile + Fingerprint fp <- liftIO $ loadFingerprint caCertificateFile + serverStats <- atomically . newFileServerStats =<< liftIO getCurrentTime + pure XFTPEnv {config, store, storeLog, idsDrg, tlsServerParams, serverIdentity = C.KeyHash fp, serverStats} + +data XFTPRequest + = XFTPReqNew FileInfo (NonEmpty RcvPublicVerifyKey) + | XFTPReqCmd FileRec FileCmd + | XFTPReqPing diff --git a/src/Simplex/FileTransfer/Server/Main.hs b/src/Simplex/FileTransfer/Server/Main.hs index d68227fc2..5f3465698 100644 --- a/src/Simplex/FileTransfer/Server/Main.hs +++ b/src/Simplex/FileTransfer/Server/Main.hs @@ -1 +1,179 @@ +{-# LANGUAGE DuplicateRecordFields #-} +{-# LANGUAGE LambdaCase #-} +{-# LANGUAGE NamedFieldPuns #-} +{-# LANGUAGE OverloadedLists #-} +{-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE PatternSynonyms #-} + module Simplex.FileTransfer.Server.Main where + +import Data.Either (fromRight) +import Data.Functor (($>)) +import Data.Ini (lookupValue, readIniFile) +import Data.Maybe (fromMaybe) +import qualified Data.Text as T +import Network.Socket (HostName) +import Options.Applicative +import Simplex.FileTransfer.Server (runXFTPServer) +import Simplex.FileTransfer.Server.Env (XFTPServerConfig (..)) +import qualified Simplex.Messaging.Crypto as C +import Simplex.Messaging.Protocol (ProtoServerWithAuth (..), pattern XFTPServer) +import Simplex.Messaging.Server.CLI +import Simplex.Messaging.Transport.Client (TransportHost (..)) +import System.Directory (createDirectoryIfMissing, doesFileExist) +import System.FilePath (combine) +import System.IO (BufferMode (..), hSetBuffering, stderr, stdout) +import Text.Read (readMaybe) + +xftpServerVersion :: String +xftpServerVersion = "0.1.0" + +xftpServerCLI :: FilePath -> FilePath -> IO () +xftpServerCLI cfgPath logPath = do + getCliCommand' (cliCommandP cfgPath logPath iniFile) serverVersion >>= \case + Init opts -> + doesFileExist iniFile >>= \case + True -> exitError $ "Error: server is already initialized (" <> iniFile <> " exists).\nRun `" <> executableName <> " start`." + _ -> initializeServer opts + Start -> + doesFileExist iniFile >>= \case + True -> readIniFile iniFile >>= either exitError runServer + _ -> exitError $ "Error: server is not initialized (" <> iniFile <> " does not exist).\nRun `" <> executableName <> " init`." + Delete -> do + confirmOrExit "WARNING: deleting the server will make all queues inaccessible, because the server identity (certificate fingerprint) will change.\nTHIS CANNOT BE UNDONE!" + deleteDirIfExists cfgPath + deleteDirIfExists logPath + putStrLn "Deleted configuration and log files" + where + iniFile = combine cfgPath "file-server.ini" + serverVersion = "SimpleX XFTP server v" <> xftpServerVersion + defaultServerPort = "443" + executableName = "file-server" + storeLogFilePath = combine logPath "file-server-store.log" + initializeServer InitOptions {enableStoreLog, signAlgorithm, ip, fqdn, filesPath} = do + clearDirIfExists cfgPath + clearDirIfExists logPath + createDirectoryIfMissing True cfgPath + createDirectoryIfMissing True logPath + let x509cfg = defaultX509Config {commonName = fromMaybe ip fqdn, signAlgorithm} + fp <- createServerX509 cfgPath x509cfg + let host = fromMaybe (if ip == "127.0.0.1" then "" else ip) fqdn + srv = ProtoServerWithAuth (XFTPServer [THDomainName host] "" (C.KeyHash fp)) Nothing + writeFile iniFile $ iniFileContent host + putStrLn $ "Server initialized, you can modify configuration in " <> iniFile <> ".\nRun `" <> executableName <> " start` to start server." + warnCAPrivateKeyFile cfgPath x509cfg + printServiceInfo serverVersion srv + where + iniFileContent host = + "[STORE_LOG]\n\ + \# The server uses STM memory for persistence,\n\ + \# that will be lost on restart (e.g., as with redis).\n\ + \# This option enables saving memory to append only log,\n\ + \# and restoring it when the server is started.\n\ + \# Log is compacted on start (deleted objects are removed).\n" + <> ("enable: " <> onOff enableStoreLog <> "\n\n") + <> "log_stats: off\n\n\ + \[TRANSPORT]\n\ + \# host is only used to print server address on start\n" + <> ("host: " <> host <> "\n") + <> ("port: " <> defaultServerPort <> "\n") + <> "log_tls_errors: off\n\n\ + \[FILES]\n" + <> ("path: " <> filesPath <> "\n") + runServer ini = do + hSetBuffering stdout LineBuffering + hSetBuffering stderr LineBuffering + fp <- checkSavedFingerprint cfgPath defaultX509Config + let host = fromRight "" $ T.unpack <$> lookupValue "TRANSPORT" "host" ini + port = T.unpack $ strictIni "TRANSPORT" "port" ini + cfg@XFTPServerConfig {xftpPort, storeLogFile} = serverConfig + srv = ProtoServerWithAuth (XFTPServer [THDomainName host] (if port == "443" then "" else port) (C.KeyHash fp)) Nothing + printServiceInfo serverVersion srv + printXFTPConfig xftpPort storeLogFile + runXFTPServer cfg + where + enableStoreLog = settingIsOn "STORE_LOG" "enable" ini + logStats = settingIsOn "STORE_LOG" "log_stats" ini + c = combine cfgPath . ($ defaultX509Config) + printXFTPConfig xftpPort logFile = do + putStrLn $ case logFile of + Just f -> "Store log: " <> f + _ -> "Store log disabled." + putStrLn $ "Listening on port " <> xftpPort <> "..." + + serverConfig = + XFTPServerConfig + { xftpPort = T.unpack $ strictIni "TRANSPORT" "port" ini, + fileIdSize = 16, + storeLogFile = enableStoreLog $> storeLogFilePath, + filesPath = T.unpack $ strictIni "FILES" "path" ini, + caCertificateFile = c caCrtFile, + privateKeyFile = c serverKeyFile, + certificateFile = c serverCrtFile, + logStatsInterval = logStats $> 86400, -- seconds + logStatsStartTime = 0, -- seconds from 00:00 UTC + serverStatsLogFile = combine logPath "file-server-stats.daily.log", + serverStatsBackupFile = logStats $> combine logPath "file-server-stats.log", + logTLSErrors = fromMaybe False $ iniOnOff "TRANSPORT" "log_tls_errors" ini + } + +data CliCommand + = Init InitOptions + | Start + | Delete + +data InitOptions = InitOptions + { enableStoreLog :: Bool, + signAlgorithm :: SignAlgorithm, + ip :: HostName, + fqdn :: Maybe HostName, + filesPath :: FilePath + } + deriving (Show) + +cliCommandP :: FilePath -> FilePath -> FilePath -> Parser CliCommand +cliCommandP cfgPath logPath iniFile = + hsubparser + ( command "init" (info (Init <$> initP) (progDesc $ "Initialize server - creates " <> cfgPath <> " and " <> logPath <> " directories and configuration files")) + <> command "start" (info (pure Start) (progDesc $ "Start server (configuration: " <> iniFile <> ")")) + <> command "delete" (info (pure Delete) (progDesc "Delete configuration and log files")) + ) + where + initP :: Parser InitOptions + initP = + InitOptions + <$> switch + ( long "store-log" + <> short 'l' + <> help "Enable store log for persistence" + ) + <*> option + (maybeReader readMaybe) + ( long "sign-algorithm" + <> short 'a' + <> help "Signature algorithm used for TLS certificates: ED25519, ED448" + <> value ED448 + <> showDefault + <> metavar "ALG" + ) + <*> strOption + ( long "ip" + <> help + "Server IP address, used as Common Name for TLS online certificate if FQDN is not supplied" + <> value "127.0.0.1" + <> showDefault + <> metavar "IP" + ) + <*> (optional . strOption) + ( long "fqdn" + <> short 'n' + <> help "Server FQDN used as Common Name for TLS online certificate" + <> showDefault + <> metavar "FQDN" + ) + <*> strOption + ( long "path" + <> short 'p' + <> help "Path to the directory to store files" + <> metavar "PATH" + ) diff --git a/src/Simplex/FileTransfer/Server/Stats.hs b/src/Simplex/FileTransfer/Server/Stats.hs new file mode 100644 index 000000000..ff70eec9d --- /dev/null +++ b/src/Simplex/FileTransfer/Server/Stats.hs @@ -0,0 +1,93 @@ +{-# LANGUAGE DuplicateRecordFields #-} +{-# LANGUAGE NamedFieldPuns #-} +{-# LANGUAGE OverloadedStrings #-} + +module Simplex.FileTransfer.Server.Stats where + +import qualified Data.Attoparsec.ByteString.Char8 as A +import qualified Data.ByteString.Char8 as B +import Data.Time.Clock (UTCTime) +import Simplex.Messaging.Encoding.String +import Simplex.Messaging.Protocol (SenderId) +import Simplex.Messaging.Server.Stats (PeriodStats, PeriodStatsData, getPeriodStatsData, newPeriodStats, setPeriodStats) +import UnliftIO.STM + +data FileServerStats = FileServerStats + { fromTime :: TVar UTCTime, + filesCreated :: TVar Int, + fileRecipients :: TVar Int, + filesUploaded :: TVar Int, + filesDeleted :: TVar Int, + filesDownloaded :: PeriodStats SenderId, + fileDownloads :: TVar Int, + fileDownloadAcks :: TVar Int + } + +data FileServerStatsData = FileServerStatsData + { _fromTime :: UTCTime, + _filesCreated :: Int, + _fileRecipients :: Int, + _filesUploaded :: Int, + _filesDeleted :: Int, + _filesDownloaded :: PeriodStatsData SenderId, + _fileDownloads :: Int, + _fileDownloadAcks :: Int + } + +newFileServerStats :: UTCTime -> STM FileServerStats +newFileServerStats ts = do + fromTime <- newTVar ts + filesCreated <- newTVar 0 + fileRecipients <- newTVar 0 + filesUploaded <- newTVar 0 + filesDeleted <- newTVar 0 + filesDownloaded <- newPeriodStats + fileDownloads <- newTVar 0 + fileDownloadAcks <- newTVar 0 + pure FileServerStats {fromTime, filesCreated, fileRecipients, filesUploaded, filesDeleted, filesDownloaded, fileDownloads, fileDownloadAcks} + +getFileServerStatsData :: FileServerStats -> STM FileServerStatsData +getFileServerStatsData s = do + _fromTime <- readTVar $ fromTime (s :: FileServerStats) + _filesCreated <- readTVar $ filesCreated s + _fileRecipients <- readTVar $ fileRecipients s + _filesUploaded <- readTVar $ filesUploaded s + _filesDeleted <- readTVar $ filesDeleted s + _filesDownloaded <- getPeriodStatsData $ filesDownloaded s + _fileDownloads <- readTVar $ fileDownloads s + _fileDownloadAcks <- readTVar $ fileDownloadAcks s + pure FileServerStatsData {_fromTime, _filesCreated, _fileRecipients, _filesUploaded, _filesDeleted, _filesDownloaded, _fileDownloads, _fileDownloadAcks} + +setFileServerStats :: FileServerStats -> FileServerStatsData -> STM () +setFileServerStats s d = do + writeTVar (fromTime (s :: FileServerStats)) $! _fromTime (d :: FileServerStatsData) + writeTVar (filesCreated s) $! _filesCreated d + writeTVar (fileRecipients s) $! _fileRecipients d + writeTVar (filesUploaded s) $! _filesUploaded d + writeTVar (filesDeleted s) $! _filesDeleted d + setPeriodStats (filesDownloaded s) $! _filesDownloaded d + writeTVar (fileDownloads s) $! _fileDownloads d + writeTVar (fileDownloadAcks s) $! _fileDownloadAcks d + +instance StrEncoding FileServerStatsData where + strEncode FileServerStatsData {_fromTime, _filesCreated, _fileRecipients, _filesUploaded, _filesDeleted, _filesDownloaded, _fileDownloads, _fileDownloadAcks} = + B.unlines + [ "fromTime=" <> strEncode _fromTime, + "filesCreated=" <> strEncode _filesCreated, + "fileRecipients=" <> strEncode _fileRecipients, + "filesUploaded=" <> strEncode _filesUploaded, + "filesDeleted=" <> strEncode _filesDeleted, + "filesDownloaded=" <> strEncode _filesDownloaded, + "fileDownloads=" <> strEncode _fileDownloads, + "fileDownloadAcks=" <> strEncode _fileDownloadAcks + ] + strP = do + _fromTime <- "fromTime=" *> strP <* A.endOfLine + _filesCreated <- "filesCreated=" *> strP <* A.endOfLine + _fileRecipients <- "fileRecipients=" *> strP <* A.endOfLine + _filesUploaded <- "filesUploaded=" *> strP <* A.endOfLine + _filesDeleted <- "filesDeleted=" *> strP <* A.endOfLine + _filesDownloaded <- "filesDownloaded=" *> strP <* A.endOfLine + _fileDownloads <- "fileDownloads=" *> strP <* A.endOfLine + _fileDownloadAcks <- "fileDownloadAcks=" *> strP <* A.endOfLine + pure FileServerStatsData {_fromTime, _filesCreated, _fileRecipients, _filesUploaded, _filesDeleted, _filesDownloaded, _fileDownloads, _fileDownloadAcks} diff --git a/src/Simplex/FileTransfer/Server/Store.hs b/src/Simplex/FileTransfer/Server/Store.hs index da6eb4331..bbacd8335 100644 --- a/src/Simplex/FileTransfer/Server/Store.hs +++ b/src/Simplex/FileTransfer/Server/Store.hs @@ -1,9 +1,12 @@ +{-# LANGUAGE GADTs #-} {-# LANGUAGE LambdaCase #-} {-# LANGUAGE NamedFieldPuns #-} +{-# LANGUAGE TupleSections #-} module Simplex.FileTransfer.Server.Store - ( FileStore, - newQueueStore, + ( FileStore (..), + FileRec (..), + newFileStore, addFile, setFilePath, addRecipient, @@ -17,7 +20,8 @@ import Control.Concurrent.STM import Data.Functor (($>)) import Data.Set (Set) import qualified Data.Set as S -import Simplex.FileTransfer.Protocol (FileInfo) +import Simplex.FileTransfer.Protocol (FileInfo (..), SFileParty (..), XFTPFileId) +import qualified Simplex.Messaging.Crypto as C import Simplex.Messaging.Protocol hiding (SParty, SRecipient, SSender) import Simplex.Messaging.TMap (TMap) import qualified Simplex.Messaging.TMap as TM @@ -36,8 +40,8 @@ data FileRec = FileRec } deriving (Eq) -newQueueStore :: STM FileStore -newQueueStore = do +newFileStore :: STM FileStore +newFileStore = do files <- TM.empty recipients <- TM.empty pure FileStore {files, recipients} @@ -61,7 +65,7 @@ setFilePath st sId fPath = writeTVar filePath (Just fPath) $> Right () addRecipient :: FileStore -> SenderId -> (RecipientId, RcvPublicVerifyKey) -> STM (Either ErrorType ()) -addRecipient st@FileStore {recipients} senderId recipient@(rId, _) = +addRecipient st@FileStore {recipients} senderId (rId, rKey) = withFile st senderId $ \FileRec {recipientIds} -> do rIds <- readTVar recipientIds mem <- TM.member rId recipients @@ -69,7 +73,7 @@ addRecipient st@FileStore {recipients} senderId recipient@(rId, _) = then pure $ Left DUPLICATE_ else do writeTVar recipientIds $! S.insert rId rIds - TM.insert rId recipient recipients + TM.insert rId (senderId, rKey) recipients pure $ Right () deleteFile :: FileStore -> SenderId -> STM (Either ErrorType ()) @@ -80,8 +84,13 @@ deleteFile FileStore {files, recipients} senderId = do pure $ Right () _ -> pure $ Left AUTH -getFile :: FileStore -> SenderId -> STM (Either ErrorType FileRec) -getFile st sId = withFile st sId $ pure . Right +getFile :: FileStore -> SFileParty p -> XFTPFileId -> STM (Either ErrorType (FileRec, C.APublicVerifyKey)) +getFile st party fId = case party of + SSender -> withFile st fId $ pure . Right . (\f -> (f, sndKey $ fileInfo f)) + SRecipient -> + TM.lookup fId (recipients st) >>= \case + Just (sId, rKey) -> withFile st sId $ pure . Right . (,rKey) + _ -> pure $ Left AUTH ackFile :: FileStore -> RecipientId -> STM (Either ErrorType ()) ackFile st@FileStore {recipients} recipientId = do diff --git a/src/Simplex/FileTransfer/Server/StoreLog.hs b/src/Simplex/FileTransfer/Server/StoreLog.hs index 3f082c688..4258c6784 100644 --- a/src/Simplex/FileTransfer/Server/StoreLog.hs +++ b/src/Simplex/FileTransfer/Server/StoreLog.hs @@ -1 +1,123 @@ -module Simplex.FileTransfer.Server.StoreLog where +{-# LANGUAGE DataKinds #-} +{-# LANGUAGE DuplicateRecordFields #-} +{-# LANGUAGE GADTs #-} +{-# LANGUAGE LambdaCase #-} +{-# LANGUAGE NamedFieldPuns #-} +{-# LANGUAGE OverloadedStrings #-} + +module Simplex.FileTransfer.Server.StoreLog + ( StoreLog, + FileStoreLogRecord (..), + closeStoreLog, + readWriteFileStore, + logAddFile, + logPutFile, + logAddRecipients, + logDeleteFile, + logAckFile, + ) +where + +import Control.Concurrent.STM +import Control.Monad.Except +import qualified Data.Attoparsec.ByteString.Char8 as A +import qualified Data.ByteString.Char8 as B +import Data.Composition ((.:)) +import Data.List.NonEmpty (NonEmpty) +import qualified Data.List.NonEmpty as L +import Data.Map.Strict (Map) +import qualified Data.Map.Strict as M +import Simplex.FileTransfer.Protocol (FileInfo (..)) +import Simplex.FileTransfer.Server.Store +import Simplex.Messaging.Encoding.String +import Simplex.Messaging.Protocol (RcvPublicVerifyKey, RecipientId, SenderId) +import Simplex.Messaging.Server.StoreLog +import Simplex.Messaging.Util (bshow, whenM) +import System.Directory (doesFileExist, renameFile) +import System.IO + +data FileStoreLogRecord + = AddFile SenderId FileInfo + | PutFile SenderId FilePath + | AddRecipients SenderId (NonEmpty (RecipientId, RcvPublicVerifyKey)) + | DeleteFile SenderId + | AckFile RecipientId + +instance StrEncoding FileStoreLogRecord where + strEncode = \case + AddFile sId file -> strEncode (Str "FNEW", sId, file) + PutFile sId path -> strEncode (Str "FPUT", sId, path) + AddRecipients sId rcps -> strEncode (Str "FADD", sId, rcps) + DeleteFile sId -> strEncode (Str "FDEL", sId) + AckFile rId -> strEncode (Str "FACK", rId) + strP = + A.choice + [ "FNEW " *> (AddFile <$> strP_ <*> strP), + "FPUT " *> (PutFile <$> strP_ <*> strP), + "FADD " *> (AddRecipients <$> strP_ <*> strP), + "FDEL " *> (DeleteFile <$> strP), + "FACK " *> (AckFile <$> strP) + ] + +logFileStoreRecord :: StoreLog 'WriteMode -> FileStoreLogRecord -> IO () +logFileStoreRecord = writeStoreLogRecord + +logAddFile :: StoreLog 'WriteMode -> SenderId -> FileInfo -> IO () +logAddFile s = logFileStoreRecord s .: AddFile + +logPutFile :: StoreLog 'WriteMode -> SenderId -> FilePath -> IO () +logPutFile s = logFileStoreRecord s .: PutFile + +logAddRecipients :: StoreLog 'WriteMode -> SenderId -> NonEmpty (RecipientId, RcvPublicVerifyKey) -> IO () +logAddRecipients s = logFileStoreRecord s .: AddRecipients + +logDeleteFile :: StoreLog 'WriteMode -> SenderId -> IO () +logDeleteFile s = logFileStoreRecord s . DeleteFile + +logAckFile :: StoreLog 'WriteMode -> RecipientId -> IO () +logAckFile s = logFileStoreRecord s . AckFile + +readWriteFileStore :: FilePath -> FileStore -> IO (StoreLog 'WriteMode) +readWriteFileStore f st = do + whenM (doesFileExist f) $ do + readFileStore f st + renameFile f $ f <> ".bak" + s <- openWriteStoreLog f + writeFileStore s st + pure s + +readFileStore :: FilePath -> FileStore -> IO () +readFileStore f st = mapM_ addFileLogRecord . B.lines =<< B.readFile f + where + addFileLogRecord s = case strDecode s of + Left e -> B.putStrLn $ "Log parsing error (" <> B.pack e <> "): " <> B.take 100 s + Right lr -> + atomically (addToStore lr) >>= \case + Left e -> B.putStrLn $ "Log processing error (" <> bshow e <> "): " <> B.take 100 s + _ -> pure () + addToStore = \case + AddFile sId file -> addFile st sId file + PutFile qId path -> setFilePath st qId path + AddRecipients sId rcps -> runExceptT $ addRecipients sId rcps + DeleteFile sId -> deleteFile st sId + AckFile rId -> ackFile st rId + addRecipients sId rcps = mapM_ (ExceptT . addRecipient st sId) rcps + +writeFileStore :: StoreLog 'WriteMode -> FileStore -> IO () +writeFileStore s FileStore {files, recipients} = do + allRcps <- readTVarIO recipients + readTVarIO files >>= mapM_ (logFile allRcps) + where + logFile :: Map RecipientId (SenderId, RcvPublicVerifyKey) -> FileRec -> IO () + logFile allRcps FileRec {senderId, fileInfo, filePath, recipientIds} = do + logAddFile s senderId fileInfo + (rcpErrs, rcps) <- M.mapEither getRcp . M.fromSet id <$> readTVarIO recipientIds + mapM_ (logAddRecipients s senderId) $ L.nonEmpty $ M.elems rcps + mapM_ (B.putStrLn . ("Error storing log: " <>)) rcpErrs + readTVarIO filePath >>= mapM_ (logPutFile s senderId) + where + getRcp rId = case M.lookup rId allRcps of + Just (sndId, rKey) + | sndId == senderId -> Right (rId, rKey) + | otherwise -> Left $ "sender ID for recipient ID " <> bshow rId <> " does not match FileRec" + Nothing -> Left $ "recipient ID " <> bshow rId <> " not found" diff --git a/src/Simplex/FileTransfer/Transport.hs b/src/Simplex/FileTransfer/Transport.hs new file mode 100644 index 000000000..15fad7d4a --- /dev/null +++ b/src/Simplex/FileTransfer/Transport.hs @@ -0,0 +1,42 @@ +{-# LANGUAGE DuplicateRecordFields #-} +{-# LANGUAGE LambdaCase #-} +{-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE ScopedTypeVariables #-} + +module Simplex.FileTransfer.Transport + ( supportedFileServerVRange, + sendFile, + receiveFile, + ) +where + +import Control.Monad.Except +import Data.ByteString.Builder (Builder, byteString) +import Data.ByteString.Char8 (ByteString) +import qualified Data.ByteString.Char8 as B +import GHC.IO.Handle.Internals (ioe_EOF) +import Simplex.FileTransfer.Protocol (xftpBlockSize) +import Simplex.Messaging.Version +import System.IO (Handle) + +supportedFileServerVRange :: VersionRange +supportedFileServerVRange = mkVersionRange 1 1 + +sendFile :: Handle -> (Builder -> IO ()) -> Int -> IO () +sendFile _ _ 0 = pure () +sendFile h send sz = do + B.hGet h xftpBlockSize >>= \case + "" -> when (sz /= 0) ioe_EOF + ch -> do + let ch' = B.take sz ch -- sz >= xftpBlockSize + send (byteString ch') + sendFile h send $ sz - B.length ch' + +-- TODO instead of receiving the whole file this function should stop at size and return error if file is larger +receiveFile :: Handle -> (Int -> IO ByteString) -> Int -> IO Int +receiveFile h receive sz = do + ch <- receive xftpBlockSize + let chSize = B.length ch + if chSize > 0 + then B.hPut h ch >> receiveFile h receive (sz + chSize) + else pure sz diff --git a/src/Simplex/Messaging/Encoding/String.hs b/src/Simplex/Messaging/Encoding/String.hs index 769d591b9..f35251348 100644 --- a/src/Simplex/Messaging/Encoding/String.hs +++ b/src/Simplex/Messaging/Encoding/String.hs @@ -36,7 +36,7 @@ import Data.Text.Encoding (decodeLatin1, encodeUtf8) import Data.Time.Clock (UTCTime) import Data.Time.Clock.System (SystemTime (..)) import Data.Time.Format.ISO8601 -import Data.Word (Word16) +import Data.Word (Word16, Word32) import Simplex.Messaging.Encoding import Simplex.Messaging.Parsers (parseAll) import Simplex.Messaging.Util ((<$?>)) @@ -75,6 +75,10 @@ instance StrEncoding Str where strEncode = unStr strP = Str <$> A.takeTill (== ' ') <* optional A.space +instance StrEncoding FilePath where + strEncode = strEncode + strDecode = strDecode + instance ToJSON Str where toJSON (Str s) = strToJSON s toEncoding (Str s) = strToJEncoding s @@ -94,6 +98,12 @@ instance StrEncoding Word16 where strP = A.decimal {-# INLINE strP #-} +instance StrEncoding Word32 where + strEncode = B.pack . show + {-# INLINE strEncode #-} + strP = A.decimal + {-# INLINE strP #-} + instance StrEncoding Char where strEncode = smpEncode {-# INLINE strEncode #-} diff --git a/src/Simplex/Messaging/Notifications/Server/Push/APNS.hs b/src/Simplex/Messaging/Notifications/Server/Push/APNS.hs index 5eee63447..32f92c6aa 100644 --- a/src/Simplex/Messaging/Notifications/Server/Push/APNS.hs +++ b/src/Simplex/Messaging/Notifications/Server/Push/APNS.hs @@ -343,6 +343,7 @@ apnsPushProviderClient c@APNSPushClient {nonceDrg, apnsCfg} tkn@NtfTknData {toke nonce <- atomically $ C.pseudoRandomCbNonce nonceDrg apnsNtf <- liftEither $ first PPCryptoError $ apnsNotification tkn nonce (paddedNtfLength apnsCfg) pn req <- liftIO $ apnsRequest c tknStr apnsNtf + -- TODO if HTTP2 client is thread-safe, we can use sendRequestDirect (the tests pass) HTTP2Response {response, respBody = HTTP2Body {bodyHead}} <- liftHTTPS2 $ sendRequest http2 req let status = H.responseStatus response reason' = maybe "" reason $ J.decodeStrict' bodyHead diff --git a/src/Simplex/Messaging/Protocol.hs b/src/Simplex/Messaging/Protocol.hs index df2a79f0b..6aae56264 100644 --- a/src/Simplex/Messaging/Protocol.hs +++ b/src/Simplex/Messaging/Protocol.hs @@ -629,7 +629,7 @@ type XFTPServer = ProtocolServer 'PXFTP pattern XFTPServer :: NonEmpty TransportHost -> ServiceName -> C.KeyHash -> ProtocolServer 'PXFTP pattern XFTPServer host port keyHash = ProtocolServer SPXFTP host port keyHash -{-# COMPLETE NtfServer #-} +{-# COMPLETE XFTPServer #-} sameSrvAddr' :: ProtoServerWithAuth p -> ProtoServerWithAuth p -> Bool sameSrvAddr' (ProtoServerWithAuth srv _) (ProtoServerWithAuth srv' _) = sameSrvAddr srv srv' @@ -1217,7 +1217,7 @@ tPut th trs Just ts' -> encodeBatch n' s' ts' _ -> (n', s', Nothing) -tEncode :: C.CryptoSignature s => (s, ByteString) -> ByteString +tEncode :: (Maybe C.ASignature, ByteString) -> ByteString tEncode (sig, t) = smpEncode (C.signatureBytes sig) <> t tEncodeBatch :: Int -> ByteString -> ByteString diff --git a/src/Simplex/Messaging/Server.hs b/src/Simplex/Messaging/Server.hs index e66da37be..681d073c3 100644 --- a/src/Simplex/Messaging/Server.hs +++ b/src/Simplex/Messaging/Server.hs @@ -300,7 +300,7 @@ disconnectTransport THandle {connection} c activeAt expCfg = do data VerificationResult = VRVerified (Maybe QueueRec) | VRFailed verifyTransmission :: Maybe C.ASignature -> ByteString -> QueueId -> Cmd -> M VerificationResult -verifyTransmission sig_ signed queueId cmd = do +verifyTransmission sig_ signed queueId cmd = case cmd of Cmd SRecipient (NEW k _ _) -> pure $ Nothing `verified` verifyCmdSignature sig_ signed k Cmd SRecipient _ -> verifyCmd SRecipient $ verifyCmdSignature sig_ signed . recipientKey @@ -311,7 +311,7 @@ verifyTransmission sig_ signed queueId cmd = do verifyCmd :: SParty p -> (QueueRec -> Bool) -> M VerificationResult verifyCmd party f = do st <- asks queueStore - q_ <- atomically (getQueue st party queueId) + q_ <- atomically $ getQueue st party queueId pure $ case q_ of Right q -> Just q `verified` f q _ -> maybe False (dummyVerifyCmd signed) sig_ `seq` VRFailed diff --git a/src/Simplex/Messaging/Transport.hs b/src/Simplex/Messaging/Transport.hs index 2042e530e..9df1a127d 100644 --- a/src/Simplex/Messaging/Transport.hs +++ b/src/Simplex/Messaging/Transport.hs @@ -303,7 +303,7 @@ transportErrorP = "BLOCK" $> TEBadBlock <|> "LARGE_MSG" $> TELargeMsg <|> "SESSION" $> TEBadSession - <|> TEHandshake <$> parseRead1 + <|> "HANDSHAKE " *> (TEHandshake <$> parseRead1) -- | Serialize SMP encrypted transport error. serializeTransportError :: TransportError -> ByteString @@ -311,7 +311,7 @@ serializeTransportError = \case TEBadBlock -> "BLOCK" TELargeMsg -> "LARGE_MSG" TEBadSession -> "SESSION" - TEHandshake e -> bshow e + TEHandshake e -> "HANDSHAKE " <> bshow e -- | Pad and send block to SMP transport. tPutBlock :: Transport c => THandle c -> ByteString -> IO (Either TransportError ()) diff --git a/src/Simplex/Messaging/Transport/HTTP2.hs b/src/Simplex/Messaging/Transport/HTTP2.hs index dc023f920..f258f9dc9 100644 --- a/src/Simplex/Messaging/Transport/HTTP2.hs +++ b/src/Simplex/Messaging/Transport/HTTP2.hs @@ -22,7 +22,7 @@ import qualified System.TimeManager as TI defaultHTTP2BufferSize :: BufferSize defaultHTTP2BufferSize = 32768 -withHTTP2 :: BufferSize -> (Config -> SessionId -> IO ()) -> TLS -> IO () +withHTTP2 :: BufferSize -> (Config -> SessionId -> IO a) -> TLS -> IO a withHTTP2 sz run c = E.bracket (allocHTTP2Config c sz) freeSimpleConfig (`run` tlsUniq c) allocHTTP2Config :: TLS -> BufferSize -> IO Config @@ -56,25 +56,26 @@ data HTTP2Body = HTTP2Body class HTTP2BodyChunk a where getBodyChunk :: a -> IO ByteString - getBodeSize :: a -> Maybe Int + getBodySize :: a -> Maybe Int instance HTTP2BodyChunk HC.Response where getBodyChunk = HC.getResponseBodyChunk {-# INLINE getBodyChunk #-} - getBodeSize = HC.responseBodySize - {-# INLINE getBodeSize #-} + getBodySize = HC.responseBodySize + {-# INLINE getBodySize #-} instance HTTP2BodyChunk HS.Request where getBodyChunk = HS.getRequestBodyChunk {-# INLINE getBodyChunk #-} - getBodeSize = HS.requestBodySize - {-# INLINE getBodeSize #-} + getBodySize = HS.requestBodySize + {-# INLINE getBodySize #-} getHTTP2Body :: HTTP2BodyChunk a => a -> Int -> IO HTTP2Body getHTTP2Body r n = do bodyBuffer <- atomically newTBuffer let getPart n' = getBuffered bodyBuffer n' $ getBodyChunk r bodyHead <- getPart n - let bodySize = fromMaybe 0 $ getBodeSize r - bodyPart = if bodySize > n && B.length bodyHead == n then Just getPart else Nothing + let bodySize = fromMaybe 0 $ getBodySize r + -- TODO check bodySize once it is set + bodyPart = if B.length bodyHead == n then Just getPart else Nothing pure HTTP2Body {bodyHead, bodySize, bodyPart, bodyBuffer} diff --git a/src/Simplex/Messaging/Transport/HTTP2/Client.hs b/src/Simplex/Messaging/Transport/HTTP2/Client.hs index c8d74dbc5..53f2f18f4 100644 --- a/src/Simplex/Messaging/Transport/HTTP2/Client.hs +++ b/src/Simplex/Messaging/Transport/HTTP2/Client.hs @@ -1,4 +1,5 @@ {-# LANGUAGE DuplicateRecordFields #-} +{-# LANGUAGE LambdaCase #-} {-# LANGUAGE NamedFieldPuns #-} {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE ScopedTypeVariables #-} @@ -6,10 +7,11 @@ module Simplex.Messaging.Transport.HTTP2.Client where import Control.Concurrent.Async -import Control.Exception (IOException) +import Control.Exception (IOException, try) import qualified Control.Exception as E import Control.Monad.Except import Data.ByteString.Char8 (ByteString) +import Data.Functor (($>)) import Data.Time (UTCTime, getCurrentTime) import qualified Data.X509.CertificateStore as XS import Network.HPACK (BufferSize) @@ -27,14 +29,16 @@ import UnliftIO.STM import UnliftIO.Timeout data HTTP2Client = HTTP2Client - { action :: Maybe (Async ()), + { action :: Maybe (Async HTTP2Response), sessionId :: SessionId, sessionTs :: UTCTime, + sendReq :: Request -> (Response -> IO HTTP2Response) -> IO HTTP2Response, client_ :: HClient } data HClient = HClient { connected :: TVar Bool, + disconnected :: IO (), host :: TransportHost, port :: ServiceName, config :: HTTP2ClientConfig, @@ -82,7 +86,7 @@ getVerifiedHTTP2Client proxyUsername host port keyHash caStore config@HTTP2Clien mkHTTPS2Client = do connected <- newTVar False reqQ <- newTBQueue $ qSize config - pure HClient {connected, host, port, config, reqQ} + pure HClient {connected, disconnected, host, port, config, reqQ} runClient :: HClient -> IO (Either HTTP2ClientError HTTP2Client) runClient c = do @@ -97,21 +101,23 @@ getVerifiedHTTP2Client proxyUsername host port keyHash caStore config@HTTP2Clien Just (Left e) -> Left e Nothing -> Left HCNetworkError - client :: HClient -> TMVar (Either HTTP2ClientError HTTP2Client) -> SessionId -> H.Client () + client :: HClient -> TMVar (Either HTTP2ClientError HTTP2Client) -> SessionId -> H.Client HTTP2Response client c cVar sessionId sendReq = do sessionTs <- getCurrentTime - let c' = HTTP2Client {action = Nothing, client_ = c, sessionId, sessionTs} + let c' = HTTP2Client {action = Nothing, client_ = c, sendReq, sessionId, sessionTs} atomically $ do writeTVar (connected c) True putTMVar cVar (Right c') process c' sendReq `E.finally` disconnected - process :: HTTP2Client -> H.Client () + process :: HTTP2Client -> H.Client HTTP2Response process HTTP2Client {client_ = HClient {reqQ}} sendReq = forever $ do (req, respVar) <- atomically $ readTBQueue reqQ sendReq req $ \r -> do respBody <- getHTTP2Body r bodyHeadSize - atomically $ putTMVar respVar HTTP2Response {response = r, respBody} + let resp = HTTP2Response {response = r, respBody} + atomically $ putTMVar respVar resp + pure resp -- | Disconnects client from the server and terminates client threads. closeHTTP2Client :: HTTP2Client -> IO () @@ -123,8 +129,20 @@ sendRequest HTTP2Client {client_ = HClient {config, reqQ}} req = do atomically $ writeTBQueue reqQ (req, resp) maybe (Left HCResponseTimeout) Right <$> (connTimeout config `timeout` atomically (takeTMVar resp)) -runHTTP2Client :: T.Supported -> Maybe XS.CertificateStore -> TransportClientConfig -> BufferSize -> Maybe ByteString -> TransportHost -> ServiceName -> Maybe C.KeyHash -> (SessionId -> H.Client ()) -> IO () +sendRequestDirect :: HTTP2Client -> Request -> IO (Either HTTP2ClientError HTTP2Response) +sendRequestDirect HTTP2Client {client_ = HClient {config, disconnected}, sendReq} req = + connTimeout config `timeout` try (sendReq req process) >>= \case + Just (Right r) -> pure $ Right r + Just (Left e) -> disconnected $> Left (HCIOError e) + Nothing -> pure $ Left HCNetworkError + where + process r = do + respBody <- getHTTP2Body r $ bodyHeadSize config + pure HTTP2Response {response = r, respBody} + +runHTTP2Client :: forall a. T.Supported -> Maybe XS.CertificateStore -> TransportClientConfig -> BufferSize -> Maybe ByteString -> TransportHost -> ServiceName -> Maybe C.KeyHash -> (SessionId -> H.Client a) -> IO a runHTTP2Client tlsParams caStore tcConfig bufferSize proxyUsername host port keyHash client = runTLSTransportClient tlsParams caStore tcConfig proxyUsername host port keyHash $ withHTTP2 bufferSize run where + run :: H.Config -> SessionId -> IO a run cfg = H.run (ClientConfig "https" (strEncode host) 20) cfg . client diff --git a/tests/AgentTests/FunctionalAPITests.hs b/tests/AgentTests/FunctionalAPITests.hs index fdf931df0..7856a002f 100644 --- a/tests/AgentTests/FunctionalAPITests.hs +++ b/tests/AgentTests/FunctionalAPITests.hs @@ -85,7 +85,7 @@ agentCfgRatchetV1 = agentCfg {e2eEncryptVRange = vr11} vr11 :: VersionRange vr11 = mkVersionRange 1 1 -runRight_ :: HasCallStack => ExceptT AgentErrorType IO () -> Expectation +runRight_ :: (Eq e, Show e, HasCallStack) => ExceptT e IO () -> Expectation runRight_ action = runExceptT action `shouldReturn` Right () runRight :: HasCallStack => ExceptT AgentErrorType IO a -> IO a diff --git a/tests/AgentTests/NotificationTests.hs b/tests/AgentTests/NotificationTests.hs index 6f4fbb65d..a3abd2242 100644 --- a/tests/AgentTests/NotificationTests.hs +++ b/tests/AgentTests/NotificationTests.hs @@ -3,6 +3,7 @@ {-# LANGUAGE NamedFieldPuns #-} {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE PatternSynonyms #-} +{-# LANGUAGE TypeApplications #-} {-# OPTIONS_GHC -fno-warn-incomplete-uni-patterns #-} module AgentTests.NotificationTests where @@ -464,7 +465,7 @@ testNotificationsSMPRestart t APNSMockServer {apnsQ} = do liftIO $ killThread threadId pure (aliceId, bobId) - runRight_ $ do + runRight_ @AgentErrorType $ do get alice =##> \case ("", "", DOWN _ [c]) -> c == bobId; _ -> False get bob =##> \case ("", "", DOWN _ [c]) -> c == aliceId; _ -> False diff --git a/tests/CLITests.hs b/tests/CLITests.hs index e18cf1963..44ef4022c 100644 --- a/tests/CLITests.hs +++ b/tests/CLITests.hs @@ -4,6 +4,7 @@ module CLITests where import Data.Ini (lookupValue, readIniFile) import Data.List (isPrefixOf) +import Simplex.FileTransfer.Server.Main (xftpServerCLI, xftpServerVersion) import Simplex.Messaging.Notifications.Server.Main import Simplex.Messaging.Server.Main import Simplex.Messaging.Transport (simplexMQVersion) @@ -27,6 +28,12 @@ ntfCfgPath = "tests/tmp/cli/etc/opt/simplex-notifications" ntfLogPath :: FilePath ntfLogPath = "tests/tmp/cli/etc/var/simplex-notifications" +fileCfgPath :: FilePath +fileCfgPath = "tests/tmp/cli/etc/opt/simplex-files" + +fileLogPath :: FilePath +fileLogPath = "tests/tmp/cli/etc/var/simplex-files" + cliTests :: Spec cliTests = do describe "SMP server CLI" $ do @@ -38,6 +45,9 @@ cliTests = do describe "Ntf server CLI" $ do it "should initialize, start and delete the server (no store log)" $ ntfServerTest False it "should initialize, start and delete the server (with store log)" $ ntfServerTest True + describe "XFTP server CLI" $ do + it "should initialize, start and delete the server (no store log)" $ xftpServerTest False + it "should initialize, start and delete the server (with store log)" $ xftpServerTest True smpServerTest :: Bool -> Bool -> IO () smpServerTest storeLog basicAuth = do @@ -78,3 +88,20 @@ ntfServerTest storeLog = do capture_ (withStdin "Y" . withArgs ["delete"] $ ntfServerCLI ntfCfgPath ntfLogPath) >>= (`shouldSatisfy` ("WARNING: deleting the server will make all queues inaccessible" `isPrefixOf`)) doesFileExist (cfgPath <> "/ca.key") `shouldReturn` False + +xftpServerTest :: Bool -> IO () +xftpServerTest storeLog = do + capture_ (withArgs (["init", "-p tests/tmp"] <> ["-l" | storeLog]) $ xftpServerCLI fileCfgPath fileLogPath) + >>= (`shouldSatisfy` (("Server initialized, you can modify configuration in " <> fileCfgPath <> "/file-server.ini") `isPrefixOf`)) + Right ini <- readIniFile $ fileCfgPath <> "/file-server.ini" + lookupValue "STORE_LOG" "enable" ini `shouldBe` Right (if storeLog then "on" else "off") + lookupValue "STORE_LOG" "log_stats" ini `shouldBe` Right "off" + lookupValue "TRANSPORT" "port" ini `shouldBe` Right "443" + doesFileExist (fileCfgPath <> "/ca.key") `shouldReturn` True + r <- lines <$> capture_ (withArgs ["start"] $ (100000 `timeout` xftpServerCLI fileCfgPath fileLogPath) `catchAll_` pure (Just ())) + r `shouldContain` ["SimpleX XFTP server v" <> xftpServerVersion] + r `shouldContain` (if storeLog then ["Store log: " <> fileLogPath <> "/file-server-store.log"] else ["Store log disabled."]) + r `shouldContain` ["Listening on port 443..."] + capture_ (withStdin "Y" . withArgs ["delete"] $ xftpServerCLI fileCfgPath fileLogPath) + >>= (`shouldSatisfy` ("WARNING: deleting the server will make all queues inaccessible" `isPrefixOf`)) + doesFileExist (cfgPath <> "/ca.key") `shouldReturn` False diff --git a/tests/NtfClient.hs b/tests/NtfClient.hs index d3c10c80e..f0a9a1d47 100644 --- a/tests/NtfClient.hs +++ b/tests/NtfClient.hs @@ -30,6 +30,7 @@ import Network.HTTP.Types (Status) import qualified Network.HTTP.Types as N import qualified Network.HTTP2.Server as H import Network.Socket +import SMPClient (serverBracket) import Simplex.Messaging.Client (chooseTransportHost, defaultNetworkConfig) import Simplex.Messaging.Client.Agent (defaultSMPClientAgentConfig) import qualified Simplex.Messaging.Crypto as C @@ -48,7 +49,6 @@ import UnliftIO.Async import UnliftIO.Concurrent import qualified UnliftIO.Exception as E import UnliftIO.STM -import UnliftIO.Timeout (timeout) testHost :: NonEmpty TransportHost testHost = "localhost" @@ -115,19 +115,6 @@ withNtfServerCfg t cfg = (\started -> runNtfServerBlocking started cfg {transports = [(ntfTestPort, t)]}) (pure ()) -serverBracket :: MonadUnliftIO m => (TMVar Bool -> m ()) -> m () -> (ThreadId -> m a) -> m a -serverBracket process afterProcess f = do - started <- newEmptyTMVarIO - E.bracket - (forkIOWithUnmask ($ process started)) - (\t -> killThread t >> afterProcess >> waitFor started "stop") - (\t -> waitFor started "start" >> f t) - where - waitFor started s = - 5_000_000 `timeout` atomically (takeTMVar started) >>= \case - Nothing -> error $ "server did not " <> s - _ -> pure () - withNtfServerOn :: ATransport -> ServiceName -> IO a -> IO a withNtfServerOn t port' = withNtfServerThreadOn t port' . const diff --git a/tests/Test.hs b/tests/Test.hs index 3bf372102..45d3afbb0 100644 --- a/tests/Test.hs +++ b/tests/Test.hs @@ -16,6 +16,7 @@ import Simplex.Messaging.Transport.WebSockets (WS) import System.Directory (createDirectoryIfMissing, removeDirectoryRecursive) import System.Environment (setEnv) import Test.Hspec +import XFTPServerTests (xftpServerTests) logCfg :: LogConfig logCfg = LogConfig {lc_file = Nothing, lc_stderr = True} @@ -40,5 +41,7 @@ main = do describe "SMP server via WebSockets" $ serverTests (transport @WS) describe "Notifications server" $ ntfServerTests (transport @TLS) describe "SMP client agent" $ agentTests (transport @TLS) + describe "XFTP" $ do + describe "XFTP server" xftpServerTests + describe "XFTP file description" fileDescriptionTests describe "Server CLIs" cliTests - describe "File description" fileDescriptionTests diff --git a/tests/XFTPClient.hs b/tests/XFTPClient.hs new file mode 100644 index 000000000..c41a20e1c --- /dev/null +++ b/tests/XFTPClient.hs @@ -0,0 +1,61 @@ +{-# LANGUAGE LambdaCase #-} +{-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE RankNTypes #-} + +module XFTPClient where + +import Control.Concurrent (ThreadId) +import Network.Socket (ServiceName) +import SMPClient (serverBracket) +import Simplex.FileTransfer.Client +import Simplex.FileTransfer.Server (runXFTPServerBlocking) +import Simplex.FileTransfer.Server.Env (XFTPServerConfig (..)) +import Simplex.Messaging.Protocol (XFTPServer) +import Test.Hspec + +xftpTest :: HasCallStack => (HasCallStack => XFTPClient -> IO ()) -> Expectation +xftpTest test = runXFTPTest test `shouldReturn` () + +runXFTPTest :: HasCallStack => (HasCallStack => XFTPClient -> IO a) -> IO a +runXFTPTest test = withXFTPServer $ testXFTPClient test + +withXFTPServerCfg :: HasCallStack => XFTPServerConfig -> (HasCallStack => ThreadId -> IO a) -> IO a +withXFTPServerCfg cfg = + serverBracket + (`runXFTPServerBlocking` cfg) + (pure ()) + +withXFTPServer :: IO a -> IO a +withXFTPServer = withXFTPServerCfg testXFTPServerConfig . const + +xftpTestPort :: ServiceName +xftpTestPort = "7000" + +testXFTPServer :: XFTPServer +testXFTPServer = "xftp://LcJUMfVhwD8yxjAiSaDzzGF3-kLG4Uh0Fl_ZIjrRwjI=@localhost:7000" + +testXFTPServerConfig :: XFTPServerConfig +testXFTPServerConfig = + XFTPServerConfig + { xftpPort = xftpTestPort, + fileIdSize = 16, + storeLogFile = Nothing, + filesPath = "tests/xftp-files", + caCertificateFile = "tests/fixtures/ca.crt", + privateKeyFile = "tests/fixtures/server.key", + certificateFile = "tests/fixtures/server.crt", + logStatsInterval = Nothing, + logStatsStartTime = 0, + serverStatsLogFile = "tests/xftp-server-stats.daily.log", + serverStatsBackupFile = Nothing, + logTLSErrors = True + } + +testXFTPClientConfig :: XFTPClientConfig +testXFTPClientConfig = defaultXFTPClientConfig + +testXFTPClient :: HasCallStack => (HasCallStack => XFTPClient -> IO a) -> IO a +testXFTPClient client = + getXFTPClient (1, testXFTPServer, Nothing) testXFTPClientConfig (pure ()) >>= \case + Right c -> client c + Left e -> error $ show e diff --git a/tests/XFTPServerTests.hs b/tests/XFTPServerTests.hs new file mode 100644 index 000000000..9ba61c51a --- /dev/null +++ b/tests/XFTPServerTests.hs @@ -0,0 +1,57 @@ +{-# LANGUAGE DuplicateRecordFields #-} +{-# LANGUAGE NamedFieldPuns #-} +{-# LANGUAGE OverloadedLists #-} +{-# LANGUAGE OverloadedStrings #-} + +module XFTPServerTests where + +import AgentTests.FunctionalAPITests (runRight_) +import Control.Monad.IO.Class (liftIO) +import Crypto.Random (getRandomBytes) +import qualified Data.ByteString.Base64.URL as B64 +import Data.ByteString.Char8 (ByteString) +import qualified Data.ByteString.Char8 as B +import Simplex.FileTransfer.Client +import Simplex.FileTransfer.Protocol (FileInfo (..)) +import qualified Simplex.Messaging.Crypto as C +import Simplex.Messaging.Protocol (SenderId) +import System.Directory (createDirectoryIfMissing, removeDirectoryRecursive) +import System.IO (IOMode (..), withFile) +import Test.Hspec +import XFTPClient + +xftpServerTests :: Spec +xftpServerTests = + before_ (createDirectoryIfMissing False "tests/xftp-files") + . after_ (removeDirectoryRecursive "tests/xftp-files") + $ do + describe "XFTP file chunk delivery" testFileChunkDelivery + +chSize :: Num n => n +chSize = 256 * 1024 + +createTestChunk :: FilePath -> IO ByteString +createTestChunk fp = do + bytes <- getRandomBytes chSize + withFile fp WriteMode $ \h -> B.hPut h bytes + pure bytes + +readChunk :: SenderId -> IO ByteString +readChunk sId = B.readFile ("tests/xftp-files/" <> B.unpack (B64.encode sId)) + +testFileChunkDelivery :: Spec +testFileChunkDelivery = + it "should create, upload and receive file chunk" $ do + (sndKey, spKey) <- C.generateSignatureKeyPair C.SEd25519 + (rcvKey, rpKey) <- C.generateSignatureKeyPair C.SEd25519 + (rDhKey, _rpDhKey) <- C.generateKeyPair' + bytes <- createTestChunk "tests/tmp/chunk1" + xftpTest $ \c -> runRight_ $ do + let file = FileInfo {sndKey, size = chSize, digest = "abc="} + (sId, [rId]) <- createXFTPChunk c spKey file [rcvKey] + uploadXFTPChunk c spKey sId $ XFTPChunkSpec {filePath = "tests/tmp/chunk1", chunkOffset = 0, chunkSize = chSize} + liftIO $ readChunk sId `shouldReturn` bytes + (_sDhKey, chunkBody) <- downloadXFTPChunk c rpKey rId rDhKey + receiveXFTPChunk chunkBody XFTPChunkSpec {filePath = "tests/tmp/received_chunk1", chunkOffset = 0, chunkSize = chSize} + liftIO $ B.readFile "tests/tmp/received_chunk1" `shouldReturn` bytes + pure ()