mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-03-30 14:16:00 +00:00
files: server and client spike - basic upload/download (#591)
* Files: main, env, stats, storeLog * Better + transport * Executable * Env * Update Client.hs, Server.hs, and 4 more files... * Answer on request * Delay * Temp file * Bypass cert check * update package.yml, rename * update store log * extend HTTP2 transport * refactor caStore * HTTP2 body * update server stats * file server/client framework * verify server commands * process FNEW command, CLI test works * simple XFTP server test (fails) * fix test, refactor * upload chunk works * receive file chunk in the client * remove transport handshake * typo Co-authored-by: spaced4ndy <8711996+spaced4ndy@users.noreply.github.com> * fix names --------- Co-authored-by: Evgeny Poberezkin <2769109+epoberezkin@users.noreply.github.com> Co-authored-by: spaced4ndy <8711996+spaced4ndy@users.noreply.github.com>
This commit is contained in:
committed by
GitHub
parent
d7a008985b
commit
bccef0ba47
18
apps/xftp-server/Main.hs
Normal file
18
apps/xftp-server/Main.hs
Normal file
@@ -0,0 +1,18 @@
|
||||
module Main where
|
||||
|
||||
import Control.Logger.Simple
|
||||
import Simplex.FileTransfer.Server.Main
|
||||
|
||||
cfgPath :: FilePath
|
||||
cfgPath = "/etc/opt/simplex-xftp"
|
||||
|
||||
logPath :: FilePath
|
||||
logPath = "/var/opt/simplex-xftp"
|
||||
|
||||
logCfg :: LogConfig
|
||||
logCfg = LogConfig {lc_file = Nothing, lc_stderr = True}
|
||||
|
||||
main :: IO ()
|
||||
main = do
|
||||
setLogLevel LogDebug -- change to LogError in production
|
||||
withGlobalLogging logCfg $ xftpServerCLI cfgPath logPath
|
||||
@@ -12,6 +12,11 @@ source-repository-package
|
||||
location: https://github.com/simplex-chat/hs-socks.git
|
||||
tag: a30cc7a79a08d8108316094f8f2f82a0c5e1ac51
|
||||
|
||||
source-repository-package
|
||||
type: git
|
||||
location: https://github.com/kazu-yamamoto/http2.git
|
||||
tag: 1136bb126636789cec197e5d0bae39aa63c6f9e5
|
||||
|
||||
source-repository-package
|
||||
type: git
|
||||
location: https://github.com/simplex-chat/direct-sqlcipher.git
|
||||
|
||||
10
package.yaml
10
package.yaml
@@ -42,7 +42,7 @@ dependencies:
|
||||
- directory == 1.3.*
|
||||
- filepath == 1.4.*
|
||||
- http-types == 0.12.*
|
||||
- http2 == 3.0.*
|
||||
- http2 == 4.0.*
|
||||
- generic-random >= 1.3 && < 1.5
|
||||
- ini == 0.4.1
|
||||
- iso8601-time == 0.1.*
|
||||
@@ -104,6 +104,14 @@ executables:
|
||||
ghc-options:
|
||||
- -threaded
|
||||
|
||||
xftp-server:
|
||||
source-dirs: apps/xftp-server
|
||||
main: Main.hs
|
||||
dependencies:
|
||||
- simplexmq
|
||||
ghc-options:
|
||||
- -threaded
|
||||
|
||||
smp-agent:
|
||||
source-dirs: apps/smp-agent
|
||||
main: Main.hs
|
||||
|
||||
@@ -132,16 +132,15 @@ File description format (yml):
|
||||
```
|
||||
name: file.ext
|
||||
size: 33200000
|
||||
chunk: 8Mb
|
||||
chunk: 8mb
|
||||
hash: abc=
|
||||
key: abc=
|
||||
iv: abc=
|
||||
part_hashes: [def=, def=, def=, def=]
|
||||
parts:
|
||||
- server: xftp://abc=@example1.com
|
||||
chunks: [1:abc=:def=:ghi=, 3:abc=:def=:ghi=]
|
||||
- server: xftp://abc=@example2.com
|
||||
chunks: [2:abc=:def=:ghi=, 4:abc=:def=:ghi=]
|
||||
chunks: [2:abc=:def=:ghi=, 4:abc=:def=:ghi=:2mb]
|
||||
- server: xftp://abc=@example3.com
|
||||
chunks: [1:abc=:def=, 4:abc=:def=]
|
||||
- server: xftp://abc=@example4.com
|
||||
|
||||
@@ -40,9 +40,12 @@ library
|
||||
Simplex.FileTransfer.Description
|
||||
Simplex.FileTransfer.Protocol
|
||||
Simplex.FileTransfer.Server
|
||||
Simplex.FileTransfer.Server.Env
|
||||
Simplex.FileTransfer.Server.Main
|
||||
Simplex.FileTransfer.Server.Stats
|
||||
Simplex.FileTransfer.Server.Store
|
||||
Simplex.FileTransfer.Server.StoreLog
|
||||
Simplex.FileTransfer.Transport
|
||||
Simplex.Messaging.Agent
|
||||
Simplex.Messaging.Agent.Client
|
||||
Simplex.Messaging.Agent.Env.SQLite
|
||||
@@ -139,7 +142,7 @@ library
|
||||
, filepath ==1.4.*
|
||||
, generic-random >=1.3 && <1.5
|
||||
, http-types ==0.12.*
|
||||
, http2 ==3.0.*
|
||||
, http2 ==4.0.*
|
||||
, ini ==0.4.1
|
||||
, iso8601-time ==0.1.*
|
||||
, memory ==0.15.*
|
||||
@@ -201,7 +204,7 @@ executable ntf-server
|
||||
, filepath ==1.4.*
|
||||
, generic-random >=1.3 && <1.5
|
||||
, http-types ==0.12.*
|
||||
, http2 ==3.0.*
|
||||
, http2 ==4.0.*
|
||||
, ini ==0.4.1
|
||||
, iso8601-time ==0.1.*
|
||||
, memory ==0.15.*
|
||||
@@ -264,7 +267,7 @@ executable smp-agent
|
||||
, filepath ==1.4.*
|
||||
, generic-random >=1.3 && <1.5
|
||||
, http-types ==0.12.*
|
||||
, http2 ==3.0.*
|
||||
, http2 ==4.0.*
|
||||
, ini ==0.4.1
|
||||
, iso8601-time ==0.1.*
|
||||
, memory ==0.15.*
|
||||
@@ -327,7 +330,7 @@ executable smp-server
|
||||
, filepath ==1.4.*
|
||||
, generic-random >=1.3 && <1.5
|
||||
, http-types ==0.12.*
|
||||
, http2 ==3.0.*
|
||||
, http2 ==4.0.*
|
||||
, ini ==0.4.1
|
||||
, iso8601-time ==0.1.*
|
||||
, memory ==0.15.*
|
||||
@@ -360,6 +363,68 @@ executable smp-server
|
||||
if flag(swift)
|
||||
cpp-options: -DswiftJSON
|
||||
|
||||
executable xftp-server
|
||||
main-is: Main.hs
|
||||
other-modules:
|
||||
Paths_simplexmq
|
||||
hs-source-dirs:
|
||||
apps/xftp-server
|
||||
ghc-options: -Wall -Wcompat -Werror=incomplete-patterns -Wredundant-constraints -Wincomplete-record-updates -Wincomplete-uni-patterns -Wunused-type-patterns -threaded
|
||||
build-depends:
|
||||
QuickCheck ==2.14.*
|
||||
, aeson ==2.0.*
|
||||
, ansi-terminal >=0.10 && <0.12
|
||||
, asn1-encoding ==0.9.*
|
||||
, asn1-types ==0.3.*
|
||||
, async ==2.2.*
|
||||
, attoparsec ==0.14.*
|
||||
, base >=4.14 && <5
|
||||
, base64-bytestring >=1.0 && <1.3
|
||||
, bytestring ==0.10.*
|
||||
, case-insensitive ==1.2.*
|
||||
, composition ==1.0.*
|
||||
, constraints >=0.12 && <0.14
|
||||
, containers ==0.6.*
|
||||
, cryptonite >=0.27 && <0.30
|
||||
, cryptostore ==0.2.*
|
||||
, data-default ==0.7.*
|
||||
, direct-sqlcipher ==2.3.*
|
||||
, directory ==1.3.*
|
||||
, filepath ==1.4.*
|
||||
, generic-random >=1.3 && <1.5
|
||||
, http-types ==0.12.*
|
||||
, http2 ==4.0.*
|
||||
, ini ==0.4.1
|
||||
, iso8601-time ==0.1.*
|
||||
, memory ==0.15.*
|
||||
, mtl ==2.2.*
|
||||
, network >=3.1.2.7 && <3.2
|
||||
, network-transport ==0.5.4
|
||||
, optparse-applicative >=0.15 && <0.17
|
||||
, process ==1.6.*
|
||||
, random >=1.1 && <1.3
|
||||
, simple-logger ==0.1.*
|
||||
, simplexmq
|
||||
, socks ==0.6.*
|
||||
, sqlcipher-simple ==0.4.*
|
||||
, stm ==2.5.*
|
||||
, template-haskell ==2.16.*
|
||||
, text ==1.2.*
|
||||
, time ==1.9.*
|
||||
, time-compat ==1.9.*
|
||||
, time-manager ==0.0.*
|
||||
, tls >=1.6.0 && <1.7
|
||||
, transformers ==0.5.*
|
||||
, unliftio ==0.2.*
|
||||
, unliftio-core ==0.2.*
|
||||
, websockets ==0.12.*
|
||||
, x509 ==1.7.*
|
||||
, x509-store ==1.6.*
|
||||
, x509-validation ==1.6.*
|
||||
default-language: Haskell2010
|
||||
if flag(swift)
|
||||
cpp-options: -DswiftJSON
|
||||
|
||||
test-suite smp-server-test
|
||||
type: exitcode-stdio-1.0
|
||||
main-is: Test.hs
|
||||
@@ -383,6 +448,8 @@ test-suite smp-server-test
|
||||
ServerTests
|
||||
SMPAgentClient
|
||||
SMPClient
|
||||
XFTPClient
|
||||
XFTPServerTests
|
||||
Paths_simplexmq
|
||||
hs-source-dirs:
|
||||
tests
|
||||
@@ -413,7 +480,7 @@ test-suite smp-server-test
|
||||
, hspec ==2.7.*
|
||||
, hspec-core ==2.7.*
|
||||
, http-types ==0.12.*
|
||||
, http2 ==3.0.*
|
||||
, http2 ==4.0.*
|
||||
, ini ==0.4.1
|
||||
, iso8601-time ==0.1.*
|
||||
, main-tester ==0.2.*
|
||||
|
||||
@@ -1 +1,160 @@
|
||||
{-# LANGUAGE BlockArguments #-}
|
||||
{-# LANGUAGE DataKinds #-}
|
||||
{-# LANGUAGE DuplicateRecordFields #-}
|
||||
{-# LANGUAGE LambdaCase #-}
|
||||
{-# LANGUAGE NamedFieldPuns #-}
|
||||
{-# LANGUAGE OverloadedStrings #-}
|
||||
{-# LANGUAGE ScopedTypeVariables #-}
|
||||
{-# LANGUAGE TypeApplications #-}
|
||||
|
||||
module Simplex.FileTransfer.Client where
|
||||
|
||||
import Control.Monad.Except
|
||||
import Data.Bifunctor (first)
|
||||
import Data.ByteString.Builder (Builder, byteString)
|
||||
import Data.ByteString.Char8 (ByteString)
|
||||
import qualified Data.ByteString.Char8 as B
|
||||
import Data.Int (Int64)
|
||||
import Data.List.NonEmpty (NonEmpty (..))
|
||||
import qualified Network.HTTP.Types as N
|
||||
import qualified Network.HTTP2.Client as H
|
||||
import Simplex.FileTransfer.Protocol
|
||||
import Simplex.FileTransfer.Transport (receiveFile, sendFile)
|
||||
import Simplex.Messaging.Client
|
||||
( NetworkConfig (..),
|
||||
ProtocolClientError (..),
|
||||
TransportSession,
|
||||
chooseTransportHost,
|
||||
defaultNetworkConfig,
|
||||
proxyUsername,
|
||||
transportClientConfig,
|
||||
)
|
||||
import qualified Simplex.Messaging.Crypto as C
|
||||
import Simplex.Messaging.Protocol
|
||||
( ErrorType (..),
|
||||
ProtocolServer (ProtocolServer),
|
||||
RcvPublicDhKey,
|
||||
RecipientId,
|
||||
SenderId,
|
||||
)
|
||||
import Simplex.Messaging.Transport (supportedParameters)
|
||||
import Simplex.Messaging.Transport.Client (TransportClientConfig)
|
||||
import Simplex.Messaging.Transport.HTTP2
|
||||
import Simplex.Messaging.Transport.HTTP2.Client
|
||||
import Simplex.Messaging.Util (bshow, liftEitherError)
|
||||
import System.IO (IOMode (..), SeekMode (..))
|
||||
import UnliftIO.IO (hSeek, withFile)
|
||||
|
||||
data XFTPClient = XFTPClient
|
||||
{ http2Client :: HTTP2Client,
|
||||
config :: XFTPClientConfig
|
||||
}
|
||||
|
||||
data XFTPClientConfig = XFTPClientConfig
|
||||
{ networkConfig :: NetworkConfig
|
||||
}
|
||||
|
||||
data XFTPChunkBody = XFTPChunkBody
|
||||
{ chunkSize :: Int,
|
||||
chunkPart :: Int -> IO ByteString,
|
||||
http2Body :: HTTP2Body
|
||||
}
|
||||
|
||||
data XFTPChunkSpec = XFTPChunkSpec
|
||||
{ filePath :: FilePath,
|
||||
chunkOffset :: Int64,
|
||||
chunkSize :: Int
|
||||
}
|
||||
|
||||
defaultXFTPClientConfig :: XFTPClientConfig
|
||||
defaultXFTPClientConfig = XFTPClientConfig {networkConfig = defaultNetworkConfig}
|
||||
|
||||
getXFTPClient :: TransportSession FileResponse -> XFTPClientConfig -> IO () -> IO (Either ProtocolClientError XFTPClient)
|
||||
getXFTPClient transportSession@(_, srv, _) config@XFTPClientConfig {networkConfig} disconnected = runExceptT $ do
|
||||
let tcConfig = transportClientConfig networkConfig
|
||||
http2Config = xftpHTTP2Config tcConfig config
|
||||
username = proxyUsername transportSession
|
||||
ProtocolServer _ host port keyHash = srv
|
||||
useHost <- liftEither $ chooseTransportHost networkConfig host
|
||||
http2Client <- liftEitherError xftpClientError $ getVerifiedHTTP2Client (Just username) useHost port (Just keyHash) Nothing http2Config disconnected
|
||||
pure XFTPClient {http2Client, config}
|
||||
|
||||
xftpHTTP2Config :: TransportClientConfig -> XFTPClientConfig -> HTTP2ClientConfig
|
||||
xftpHTTP2Config transportConfig XFTPClientConfig {networkConfig = NetworkConfig {tcpConnectTimeout}} =
|
||||
defaultHTTP2ClientConfig
|
||||
{ bodyHeadSize = xftpBlockSize,
|
||||
suportedTLSParams = supportedParameters,
|
||||
connTimeout = tcpConnectTimeout,
|
||||
transportConfig
|
||||
}
|
||||
|
||||
xftpClientError :: HTTP2ClientError -> ProtocolClientError
|
||||
xftpClientError = \case
|
||||
HCResponseTimeout -> PCEResponseTimeout
|
||||
HCNetworkError -> PCENetworkError
|
||||
HCIOError e -> PCEIOError e
|
||||
|
||||
sendXFTPCommand :: forall p. FilePartyI p => XFTPClient -> C.APrivateSignKey -> XFTPFileId -> FileCommand p -> Maybe XFTPChunkSpec -> ExceptT ProtocolClientError IO (FileResponse, HTTP2Body)
|
||||
sendXFTPCommand XFTPClient {http2Client = http2@HTTP2Client {sessionId}} pKey fId cmd chunkSpec_ = do
|
||||
t <-
|
||||
liftEither . first PCETransportError $
|
||||
xftpEncodeTransmission sessionId (Just pKey) ("", fId, FileCmd (sFileParty @p) cmd)
|
||||
let req = H.requestStreaming N.methodPost "/" [] $ streamBody t
|
||||
HTTP2Response {respBody = body@HTTP2Body {bodyHead}} <- liftEitherError xftpClientError $ sendRequestDirect http2 req
|
||||
when (B.length bodyHead /= xftpBlockSize) $ throwError $ PCEResponseError BLOCK
|
||||
-- TODO validate that the file ID is the same as in the request?
|
||||
(_, _, (_, _fId, respOrErr)) <- liftEither . first PCEResponseError $ xftpDecodeTransmission sessionId bodyHead
|
||||
case respOrErr of
|
||||
Right r -> pure (r, body)
|
||||
Left e -> throwError $ PCEResponseError e
|
||||
where
|
||||
streamBody :: ByteString -> (Builder -> IO ()) -> IO () -> IO ()
|
||||
streamBody t send done = do
|
||||
send $ byteString t
|
||||
forM_ chunkSpec_ $ \XFTPChunkSpec {filePath, chunkOffset, chunkSize} ->
|
||||
withFile filePath ReadMode $ \h -> do
|
||||
hSeek h AbsoluteSeek $ fromIntegral chunkOffset
|
||||
sendFile h send chunkSize
|
||||
done
|
||||
|
||||
createXFTPChunk ::
|
||||
XFTPClient ->
|
||||
C.APrivateSignKey ->
|
||||
FileInfo ->
|
||||
NonEmpty C.APublicVerifyKey ->
|
||||
ExceptT ProtocolClientError IO (SenderId, NonEmpty RecipientId)
|
||||
createXFTPChunk c spKey file rsps =
|
||||
sendXFTPCommand c spKey "" (FNEW file rsps) Nothing >>= \case
|
||||
-- TODO check that body is empty
|
||||
(FRSndIds sId rIds, _body) -> pure (sId, rIds)
|
||||
(r, _) -> throwError . PCEUnexpectedResponse $ bshow r
|
||||
|
||||
uploadXFTPChunk :: XFTPClient -> C.APrivateSignKey -> XFTPFileId -> XFTPChunkSpec -> ExceptT ProtocolClientError IO ()
|
||||
uploadXFTPChunk c spKey fId chunkSpec =
|
||||
sendXFTPCommand c spKey fId FPUT (Just chunkSpec) >>= \case
|
||||
-- TODO check that body is empty
|
||||
(FROk, _body) -> pure ()
|
||||
(r, _) -> throwError . PCEUnexpectedResponse $ bshow r
|
||||
|
||||
downloadXFTPChunk :: XFTPClient -> C.APrivateSignKey -> XFTPFileId -> RcvPublicDhKey -> ExceptT ProtocolClientError IO (RcvPublicDhKey, XFTPChunkBody)
|
||||
downloadXFTPChunk c rpKey fId rKey =
|
||||
sendXFTPCommand c rpKey fId (FGET rKey) Nothing >>= \case
|
||||
(FRFile sKey, http2Body@HTTP2Body {bodyHead, bodySize, bodyPart}) -> case bodyPart of
|
||||
-- TODO atm bodySize is set to 0, so chunkSize will be incorrect
|
||||
Just chunkPart -> do
|
||||
let chunk = XFTPChunkBody {chunkSize = bodySize - B.length bodyHead, chunkPart, http2Body}
|
||||
pure (sKey, chunk)
|
||||
_ -> throwError $ PCEResponseError NO_MSG
|
||||
(r, _) -> throwError . PCEUnexpectedResponse $ bshow r
|
||||
|
||||
receiveXFTPChunk :: XFTPChunkBody -> XFTPChunkSpec -> ExceptT ProtocolClientError IO ()
|
||||
receiveXFTPChunk XFTPChunkBody {chunkPart} XFTPChunkSpec {filePath, chunkOffset, chunkSize} = liftIO $ do
|
||||
withFile filePath WriteMode $ \h -> do
|
||||
hSeek h AbsoluteSeek $ fromIntegral chunkOffset
|
||||
-- TODO chunk decryption
|
||||
void $ receiveFile h chunkPart 0
|
||||
|
||||
-- FADD :: NonEmpty RcvPublicVerifyKey -> FileCommand Sender
|
||||
-- FDEL :: FileCommand Sender
|
||||
-- FACK :: FileCommand Recipient
|
||||
-- PING :: FileCommand Recipient
|
||||
|
||||
@@ -11,14 +11,17 @@
|
||||
|
||||
module Simplex.FileTransfer.Protocol where
|
||||
|
||||
import Data.Bifunctor (first)
|
||||
import Data.ByteString.Char8 (ByteString)
|
||||
import qualified Data.ByteString.Char8 as B
|
||||
import Data.Kind (Type)
|
||||
import Data.List.NonEmpty (NonEmpty)
|
||||
import Data.Maybe (isJust, isNothing)
|
||||
import Data.List.NonEmpty (NonEmpty (..))
|
||||
import Data.Maybe (isNothing)
|
||||
import Data.Type.Equality
|
||||
import Data.Word (Word32)
|
||||
import qualified Simplex.Messaging.Crypto as C
|
||||
import Simplex.Messaging.Encoding
|
||||
import Simplex.Messaging.Encoding.String
|
||||
import Simplex.Messaging.Notifications.Transport (ntfClientHandshake)
|
||||
import Simplex.Messaging.Protocol
|
||||
( CommandError (..),
|
||||
@@ -29,11 +32,29 @@ import Simplex.Messaging.Protocol
|
||||
ProtocolType (..),
|
||||
RcvPublicDhKey,
|
||||
RcvPublicVerifyKey,
|
||||
RecipientId,
|
||||
SenderId,
|
||||
SentRawTransmission,
|
||||
SignedTransmission,
|
||||
SndPublicVerifyKey,
|
||||
Transmission,
|
||||
encodeTransmission,
|
||||
messageTagP,
|
||||
tDecodeParseValidate,
|
||||
tEncode,
|
||||
tEncodeBatch,
|
||||
tParse,
|
||||
_smpP,
|
||||
)
|
||||
import Simplex.Messaging.Transport (SessionId, TransportError (..))
|
||||
import Simplex.Messaging.Util ((<$?>))
|
||||
import Simplex.Messaging.Version
|
||||
|
||||
currentXFTPVersion :: Version
|
||||
currentXFTPVersion = 1
|
||||
|
||||
xftpBlockSize :: Int
|
||||
xftpBlockSize = 16384
|
||||
|
||||
-- | File protocol clients
|
||||
data FileParty = Recipient | Sender
|
||||
@@ -129,6 +150,8 @@ data FileInfo = FileInfo
|
||||
}
|
||||
deriving (Eq, Show)
|
||||
|
||||
type XFTPFileId = ByteString
|
||||
|
||||
instance FilePartyI p => ProtocolEncoding (FileCommand p) where
|
||||
type Tag (FileCommand p) = FileCommandTag p
|
||||
encodeProtocol _v = \case
|
||||
@@ -145,14 +168,18 @@ instance FilePartyI p => ProtocolEncoding (FileCommand p) where
|
||||
|
||||
protocolP v tag = (\(FileCmd _ c) -> checkParty c) <$?> protocolP v (FCT (sFileParty @p) tag)
|
||||
|
||||
checkCredentials (sig, _, chunkId, _) cmd = case cmd of
|
||||
checkCredentials (sig, _, fileId, _) cmd = case cmd of
|
||||
-- FNEW must not have signature and chunk ID
|
||||
FNEW {}
|
||||
| isJust sig || not (B.null chunkId) -> Left $ CMD HAS_AUTH
|
||||
| isNothing sig -> Left $ CMD NO_AUTH
|
||||
| not (B.null fileId) -> Left $ CMD HAS_AUTH
|
||||
| otherwise -> Right cmd
|
||||
PING
|
||||
| isNothing sig && B.null fileId -> Right cmd
|
||||
| otherwise -> Left $ CMD HAS_AUTH
|
||||
-- other client commands must have both signature and queue ID
|
||||
_
|
||||
| isNothing sig || B.null chunkId -> Left $ CMD NO_AUTH
|
||||
| isNothing sig || B.null fileId -> Left $ CMD NO_AUTH
|
||||
| otherwise -> Right cmd
|
||||
|
||||
instance ProtocolEncoding FileCmd where
|
||||
@@ -178,9 +205,14 @@ instance Encoding FileInfo where
|
||||
smpEncode FileInfo {sndKey, size, digest} = smpEncode (sndKey, size, digest)
|
||||
smpP = FileInfo <$> smpP <*> smpP <*> smpP
|
||||
|
||||
instance StrEncoding FileInfo where
|
||||
strEncode FileInfo {sndKey, size, digest} = strEncode (sndKey, size, digest)
|
||||
strP = FileInfo <$> strP_ <*> strP_ <*> strP
|
||||
|
||||
data FileResponseTag
|
||||
= FRChunkIds_
|
||||
= FRSndIds_
|
||||
| FRRcvIds_
|
||||
| FRFile_
|
||||
| FROk_
|
||||
| FRErr_
|
||||
| FRPong_
|
||||
@@ -188,8 +220,9 @@ data FileResponseTag
|
||||
|
||||
instance Encoding FileResponseTag where
|
||||
smpEncode = \case
|
||||
FRChunkIds_ -> "CHUNK"
|
||||
FRSndIds_ -> "SIDS"
|
||||
FRRcvIds_ -> "RIDS"
|
||||
FRFile_ -> "FILE"
|
||||
FROk_ -> "OK"
|
||||
FRErr_ -> "ERR"
|
||||
FRPong_ -> "PONG"
|
||||
@@ -197,16 +230,18 @@ instance Encoding FileResponseTag where
|
||||
|
||||
instance ProtocolMsgTag FileResponseTag where
|
||||
decodeTag = \case
|
||||
"CHUNK" -> Just FRChunkIds_
|
||||
"SIDS" -> Just FRSndIds_
|
||||
"RIDS" -> Just FRRcvIds_
|
||||
"FILE" -> Just FRFile_
|
||||
"OK" -> Just FROk_
|
||||
"ERR" -> Just FRErr_
|
||||
"PONG" -> Just FRPong_
|
||||
_ -> Nothing
|
||||
|
||||
data FileResponse
|
||||
= FRChunkIds FileChunkId (NonEmpty FileChunkId)
|
||||
| FRRcvIds (NonEmpty FileChunkId)
|
||||
= FRSndIds SenderId (NonEmpty RecipientId)
|
||||
| FRRcvIds (NonEmpty RecipientId)
|
||||
| FRFile RcvPublicDhKey
|
||||
| FROk
|
||||
| FRErr ErrorType
|
||||
| FRPong
|
||||
@@ -215,8 +250,9 @@ data FileResponse
|
||||
instance ProtocolEncoding FileResponse where
|
||||
type Tag FileResponse = FileResponseTag
|
||||
encodeProtocol _v = \case
|
||||
FRChunkIds chId rIds -> e (FRChunkIds_, ' ', chId, rIds)
|
||||
FRSndIds fId rIds -> e (FRSndIds_, ' ', fId, rIds)
|
||||
FRRcvIds rIds -> e (FRRcvIds_, ' ', rIds)
|
||||
FRFile rKey -> e (FRFile_, ' ', rKey)
|
||||
FROk -> e FROk_
|
||||
FRErr err -> e (FRErr_, ' ', err)
|
||||
FRPong -> e FRPong_
|
||||
@@ -225,14 +261,15 @@ instance ProtocolEncoding FileResponse where
|
||||
e = smpEncode
|
||||
|
||||
protocolP _v = \case
|
||||
FRChunkIds_ -> FRChunkIds <$> _smpP <*> smpP
|
||||
FRSndIds_ -> FRSndIds <$> _smpP <*> smpP
|
||||
FRRcvIds_ -> FRRcvIds <$> _smpP
|
||||
FRFile_ -> FRFile <$> _smpP
|
||||
FROk_ -> pure FROk
|
||||
FRErr_ -> FRErr <$> _smpP
|
||||
FRPong_ -> pure FRPong
|
||||
|
||||
checkCredentials (_, _, entId, _) cmd = case cmd of
|
||||
FRChunkIds {} -> noEntity
|
||||
FRSndIds {} -> noEntity
|
||||
-- ERR response does not always have entity ID
|
||||
FRErr _ -> Right cmd
|
||||
-- PONG response must not have queue ID
|
||||
@@ -246,8 +283,6 @@ instance ProtocolEncoding FileResponse where
|
||||
| B.null entId = Right cmd
|
||||
| otherwise = Left $ CMD HAS_AUTH
|
||||
|
||||
type FileChunkId = ByteString
|
||||
|
||||
checkParty :: forall t p p'. (FilePartyI p, FilePartyI p') => t p' -> Either String (t p)
|
||||
checkParty c = case testEquality (sFileParty @p) (sFileParty @p') of
|
||||
Just Refl -> Right c
|
||||
@@ -257,3 +292,24 @@ checkParty' :: forall t p p'. (FilePartyI p, FilePartyI p') => t p' -> Maybe (t
|
||||
checkParty' c = case testEquality (sFileParty @p) (sFileParty @p') of
|
||||
Just Refl -> Just c
|
||||
_ -> Nothing
|
||||
|
||||
xftpEncodeTransmission :: ProtocolEncoding c => SessionId -> Maybe C.APrivateSignKey -> Transmission c -> Either TransportError ByteString
|
||||
xftpEncodeTransmission sessionId pKey (corrId, fId, msg) = do
|
||||
let t = encodeTransmission currentXFTPVersion sessionId (corrId, fId, msg)
|
||||
xftpEncodeBatch1 $ signTransmission t
|
||||
where
|
||||
signTransmission :: ByteString -> SentRawTransmission
|
||||
signTransmission t = ((`C.sign` t) <$> pKey, t)
|
||||
|
||||
-- this function uses batch syntax but puts only one transmission in the batch
|
||||
xftpEncodeBatch1 :: (Maybe C.ASignature, ByteString) -> Either TransportError ByteString
|
||||
xftpEncodeBatch1 (sig, t) =
|
||||
let t' = tEncodeBatch 1 . smpEncode . Large $ tEncode (sig, t)
|
||||
in first (const TELargeMsg) $ C.pad t' xftpBlockSize
|
||||
|
||||
xftpDecodeTransmission :: ProtocolEncoding c => SessionId -> ByteString -> Either ErrorType (SignedTransmission c)
|
||||
xftpDecodeTransmission sessionId t = do
|
||||
t' <- first (const LARGE_MSG) $ C.unPad t
|
||||
case tParse True t' of
|
||||
t'' :| [] -> Right $ tDecodeParseValidate sessionId currentXFTPVersion t''
|
||||
_ -> Left BLOCK
|
||||
|
||||
@@ -1 +1,268 @@
|
||||
{-# LANGUAGE DataKinds #-}
|
||||
{-# LANGUAGE DuplicateRecordFields #-}
|
||||
{-# LANGUAGE FlexibleContexts #-}
|
||||
{-# LANGUAGE GADTs #-}
|
||||
{-# LANGUAGE LambdaCase #-}
|
||||
{-# LANGUAGE NamedFieldPuns #-}
|
||||
{-# LANGUAGE NumericUnderscores #-}
|
||||
{-# LANGUAGE OverloadedLists #-}
|
||||
{-# LANGUAGE OverloadedStrings #-}
|
||||
{-# LANGUAGE ScopedTypeVariables #-}
|
||||
{-# LANGUAGE TupleSections #-}
|
||||
|
||||
module Simplex.FileTransfer.Server where
|
||||
|
||||
import Control.Logger.Simple
|
||||
import Control.Monad.Except
|
||||
import Control.Monad.IO.Unlift (MonadUnliftIO)
|
||||
import Control.Monad.Reader
|
||||
import Crypto.Random (getRandomBytes)
|
||||
import qualified Data.ByteString.Base64.URL as B64
|
||||
import Data.ByteString.Builder (byteString)
|
||||
import Data.ByteString.Char8 (ByteString)
|
||||
import qualified Data.ByteString.Char8 as B
|
||||
import Data.Functor (($>))
|
||||
import Data.List (intercalate)
|
||||
import qualified Data.List.NonEmpty as L
|
||||
import qualified Data.Text as T
|
||||
import Data.Time.Clock (UTCTime (..), diffTimeToPicoseconds, getCurrentTime)
|
||||
import Data.Time.Format.ISO8601 (iso8601Show)
|
||||
import Data.Word (Word32)
|
||||
import qualified Network.HTTP.Types as N
|
||||
import qualified Network.HTTP2.Server as H
|
||||
import Simplex.FileTransfer.Protocol
|
||||
import Simplex.FileTransfer.Server.Env
|
||||
import Simplex.FileTransfer.Server.Stats
|
||||
import Simplex.FileTransfer.Server.Store
|
||||
import Simplex.FileTransfer.Transport (receiveFile, sendFile)
|
||||
import qualified Simplex.Messaging.Crypto as C
|
||||
import Simplex.Messaging.Encoding.String
|
||||
import Simplex.Messaging.Protocol (CorrId, ErrorType (..), RcvPublicDhKey)
|
||||
import Simplex.Messaging.Server (dummyVerifyCmd, verifyCmdSignature)
|
||||
import Simplex.Messaging.Server.Stats
|
||||
import Simplex.Messaging.Server.StoreLog (StoreLog, closeStoreLog)
|
||||
import Simplex.Messaging.Transport.HTTP2
|
||||
import Simplex.Messaging.Transport.HTTP2.Server
|
||||
import Simplex.Messaging.Util
|
||||
import System.Exit (exitFailure)
|
||||
import System.FilePath ((</>))
|
||||
import System.IO (BufferMode (..), hPutStrLn, hSetBuffering)
|
||||
import UnliftIO (IOMode (..), withFile)
|
||||
import UnliftIO.Concurrent (threadDelay)
|
||||
import UnliftIO.Directory (doesFileExist, removeFile, renameFile)
|
||||
import UnliftIO.Exception
|
||||
import UnliftIO.STM
|
||||
|
||||
type M a = ReaderT XFTPEnv IO a
|
||||
|
||||
runXFTPServer :: XFTPServerConfig -> IO ()
|
||||
runXFTPServer cfg = do
|
||||
started <- newEmptyTMVarIO
|
||||
runXFTPServerBlocking started cfg
|
||||
|
||||
runXFTPServerBlocking :: TMVar Bool -> XFTPServerConfig -> IO ()
|
||||
runXFTPServerBlocking started cfg = newXFTPServerEnv cfg >>= runReaderT (xftpServer cfg started)
|
||||
|
||||
xftpServer :: XFTPServerConfig -> TMVar Bool -> M ()
|
||||
xftpServer cfg@XFTPServerConfig {xftpPort, logTLSErrors} started = do
|
||||
restoreServerStats
|
||||
raceAny_ (runServer : serverStatsThread_ cfg) `finally` stopServer
|
||||
where
|
||||
runServer :: M ()
|
||||
runServer = do
|
||||
serverParams <- asks tlsServerParams
|
||||
env <- ask
|
||||
liftIO $
|
||||
runHTTP2Server started xftpPort defaultHTTP2BufferSize serverParams logTLSErrors $ \sessionId r sendResponse -> do
|
||||
reqBody <- getHTTP2Body r xftpBlockSize
|
||||
processRequest HTTP2Request {sessionId, request = r, reqBody, sendResponse} `runReaderT` env
|
||||
|
||||
stopServer :: M ()
|
||||
stopServer = do
|
||||
withFileLog closeStoreLog
|
||||
saveServerStats
|
||||
|
||||
serverStatsThread_ :: XFTPServerConfig -> [M ()]
|
||||
serverStatsThread_ XFTPServerConfig {logStatsInterval = Just interval, logStatsStartTime, serverStatsLogFile} =
|
||||
[logServerStats logStatsStartTime interval serverStatsLogFile]
|
||||
serverStatsThread_ _ = []
|
||||
|
||||
logServerStats :: Int -> Int -> FilePath -> M ()
|
||||
logServerStats startAt logInterval statsFilePath = do
|
||||
initialDelay <- (startAt -) . fromIntegral . (`div` 1000000_000000) . diffTimeToPicoseconds . utctDayTime <$> liftIO getCurrentTime
|
||||
liftIO $ putStrLn $ "server stats log enabled: " <> statsFilePath
|
||||
threadDelay $ 1_000_000 * (initialDelay + if initialDelay < 0 then 86_400 else 0)
|
||||
FileServerStats {fromTime, filesCreated, fileRecipients, filesUploaded, filesDeleted, filesDownloaded, fileDownloads, fileDownloadAcks} <- asks serverStats
|
||||
let interval = 1_000_000 * logInterval
|
||||
forever $ do
|
||||
withFile statsFilePath AppendMode $ \h -> liftIO $ do
|
||||
hSetBuffering h LineBuffering
|
||||
ts <- getCurrentTime
|
||||
fromTime' <- atomically $ swapTVar fromTime ts
|
||||
filesCreated' <- atomically $ swapTVar filesCreated 0
|
||||
fileRecipients' <- atomically $ swapTVar fileRecipients 0
|
||||
filesUploaded' <- atomically $ swapTVar filesUploaded 0
|
||||
filesDeleted' <- atomically $ swapTVar filesDeleted 0
|
||||
files <- atomically $ periodStatCounts filesDownloaded ts
|
||||
fileDownloads' <- atomically $ swapTVar fileDownloads 0
|
||||
fileDownloadAcks' <- atomically $ swapTVar fileDownloadAcks 0
|
||||
hPutStrLn h $
|
||||
intercalate
|
||||
","
|
||||
[ iso8601Show $ utctDay fromTime',
|
||||
show filesCreated',
|
||||
show fileRecipients',
|
||||
show filesUploaded',
|
||||
show filesDeleted',
|
||||
dayCount files,
|
||||
weekCount files,
|
||||
monthCount files,
|
||||
show fileDownloads',
|
||||
show fileDownloadAcks'
|
||||
]
|
||||
threadDelay interval
|
||||
|
||||
-- TODO add client DH secret
|
||||
data ServerFile = ServerFile
|
||||
{ filePath :: FilePath,
|
||||
fileSize :: Word32,
|
||||
fileDhSecret :: C.DhSecretX25519
|
||||
}
|
||||
|
||||
processRequest :: HTTP2Request -> M ()
|
||||
processRequest HTTP2Request {sessionId, reqBody = body@HTTP2Body {bodyHead}, sendResponse}
|
||||
| B.length bodyHead /= xftpBlockSize = sendXFTPResponse ("", "", FRErr BLOCK) Nothing
|
||||
| otherwise = do
|
||||
case xftpDecodeTransmission sessionId bodyHead of
|
||||
Right (sig_, signed, (corrId, fId, cmdOrErr)) -> do
|
||||
case cmdOrErr of
|
||||
Right cmd -> do
|
||||
verifyXFTPTransmission sig_ signed fId cmd >>= \case
|
||||
VRVerified req -> uncurry send =<< processXFTPRequest body req
|
||||
VRFailed -> send (FRErr AUTH) Nothing
|
||||
Left e -> send (FRErr e) Nothing
|
||||
where
|
||||
send resp = sendXFTPResponse (corrId, fId, resp)
|
||||
Left e -> sendXFTPResponse ("", "", FRErr e) Nothing
|
||||
where
|
||||
sendXFTPResponse :: (CorrId, XFTPFileId, FileResponse) -> Maybe ServerFile -> M ()
|
||||
sendXFTPResponse (corrId, fId, resp) serverFile_ = do
|
||||
-- liftIO . sendResponse . H.responseBuilder N.ok200 [] . byteString $
|
||||
let t_ = xftpEncodeTransmission sessionId Nothing (corrId, fId, resp)
|
||||
liftIO $ sendResponse $ H.responseStreaming N.ok200 [] $ streamBody t_
|
||||
where
|
||||
streamBody t_ send done = do
|
||||
case t_ of
|
||||
Left _ -> send "padding error" -- TODO respond with BLOCK error?
|
||||
Right t -> do
|
||||
send $ byteString t
|
||||
-- TODO chunk encryption
|
||||
forM_ serverFile_ $ \ServerFile {filePath, fileSize, fileDhSecret} -> do
|
||||
withFile filePath ReadMode $ \h -> sendFile h send $ fromIntegral fileSize
|
||||
done
|
||||
|
||||
data VerificationResult = VRVerified XFTPRequest | VRFailed
|
||||
|
||||
verifyXFTPTransmission :: Maybe C.ASignature -> ByteString -> XFTPFileId -> FileCmd -> M VerificationResult
|
||||
verifyXFTPTransmission sig_ signed fId cmd =
|
||||
case cmd of
|
||||
FileCmd SSender (FNEW file rcps) -> pure $ XFTPReqNew file rcps `verifyWith` sndKey file
|
||||
FileCmd SRecipient PING -> pure $ VRVerified XFTPReqPing
|
||||
FileCmd party _ -> verifyCmd party
|
||||
where
|
||||
verifyCmd :: SFileParty p -> M VerificationResult
|
||||
verifyCmd party = do
|
||||
st <- asks store
|
||||
atomically $ verify <$> getFile st party fId
|
||||
where
|
||||
verify = \case
|
||||
Right (fr, k) -> XFTPReqCmd fr cmd `verifyWith` k
|
||||
_ -> maybe False (dummyVerifyCmd signed) sig_ `seq` VRFailed
|
||||
req `verifyWith` k = if verifyCmdSignature sig_ signed k then VRVerified req else VRFailed
|
||||
|
||||
processXFTPRequest :: HTTP2Body -> XFTPRequest -> M (FileResponse, Maybe ServerFile)
|
||||
processXFTPRequest HTTP2Body {bodyPart} = \case
|
||||
XFTPReqNew file rcps -> do
|
||||
st <- asks store
|
||||
-- TODO validate body empty
|
||||
-- TODO retry on duplicate IDs?
|
||||
sId <- getFileId
|
||||
rIds <- mapM (const getFileId) rcps
|
||||
r <- runExceptT $ do
|
||||
ExceptT $ atomically $ addFile st sId file
|
||||
forM (L.zip rIds rcps) $ \rcp ->
|
||||
ExceptT $ atomically $ addRecipient st sId rcp
|
||||
noFile $ either FRErr (const $ FRSndIds sId rIds) r
|
||||
XFTPReqCmd fr (FileCmd _ cmd) -> case cmd of
|
||||
FADD _rcps -> noFile FROk
|
||||
FPUT -> (,Nothing) <$> receiveServerFile fr
|
||||
FDEL -> noFile FROk
|
||||
FGET dhKey -> sendServerFile fr dhKey
|
||||
FACK -> noFile FROk
|
||||
-- it should never get to the options below, they are passed in other constructors of XFTPRequest
|
||||
FNEW _ _ -> noFile $ FRErr INTERNAL
|
||||
PING -> noFile FRPong
|
||||
XFTPReqPing -> noFile FRPong
|
||||
where
|
||||
noFile resp = pure (resp, Nothing)
|
||||
receiveServerFile :: FileRec -> M FileResponse
|
||||
receiveServerFile FileRec {senderId, fileInfo, filePath} = case bodyPart of
|
||||
Nothing -> pure $ FRErr QUOTA -- TODO file specific errors?
|
||||
Just getBody -> do
|
||||
-- TODO validate body size before downloading, once it's populated
|
||||
path <- asks $ filesPath . config
|
||||
let fPath = path </> B.unpack (B64.encode senderId)
|
||||
FileInfo {size, digest} = fileInfo
|
||||
size' <- liftIO . withFile fPath WriteMode $ \h -> receiveFile h getBody 0
|
||||
if size' == fromIntegral size -- TODO check digest
|
||||
then atomically $ writeTVar filePath (Just fPath) $> FROk
|
||||
else whenM (doesFileExist fPath) (removeFile fPath) $> FRErr QUOTA
|
||||
sendServerFile :: FileRec -> RcvPublicDhKey -> M (FileResponse, Maybe ServerFile)
|
||||
sendServerFile FileRec {filePath, fileInfo = FileInfo {size}} rKey = do
|
||||
readTVarIO filePath >>= \case
|
||||
Just path -> do
|
||||
(sKey, spKey) <- liftIO C.generateKeyPair'
|
||||
let fileDhSecret = C.dh' rKey spKey
|
||||
pure (FRFile sKey, Just ServerFile {filePath = path, fileSize = size, fileDhSecret})
|
||||
_ -> pure (FRErr AUTH, Nothing) -- TODO file-specific errors?
|
||||
|
||||
randomId :: (MonadUnliftIO m, MonadReader XFTPEnv m) => Int -> m ByteString
|
||||
randomId n = do
|
||||
gVar <- asks idsDrg
|
||||
atomically (C.pseudoRandomBytes n gVar)
|
||||
|
||||
getFileId :: M XFTPFileId
|
||||
getFileId = liftIO . getRandomBytes =<< asks (fileIdSize . config)
|
||||
|
||||
withFileLog :: (StoreLog 'WriteMode -> IO a) -> M ()
|
||||
withFileLog action = liftIO . mapM_ action =<< asks storeLog
|
||||
|
||||
incFileStat :: (FileServerStats -> TVar Int) -> M ()
|
||||
incFileStat statSel = do
|
||||
stats <- asks serverStats
|
||||
atomically $ modifyTVar (statSel stats) (+ 1)
|
||||
|
||||
saveServerStats :: M ()
|
||||
saveServerStats =
|
||||
asks (serverStatsBackupFile . config)
|
||||
>>= mapM_ (\f -> asks serverStats >>= atomically . getFileServerStatsData >>= liftIO . saveStats f)
|
||||
where
|
||||
saveStats f stats = do
|
||||
logInfo $ "saving server stats to file " <> T.pack f
|
||||
B.writeFile f $ strEncode stats
|
||||
logInfo "server stats saved"
|
||||
|
||||
restoreServerStats :: M ()
|
||||
restoreServerStats = asks (serverStatsBackupFile . config) >>= mapM_ restoreStats
|
||||
where
|
||||
restoreStats f = whenM (doesFileExist f) $ do
|
||||
logInfo $ "restoring server stats from file " <> T.pack f
|
||||
liftIO (strDecode <$> B.readFile f) >>= \case
|
||||
Right d -> do
|
||||
s <- asks serverStats
|
||||
atomically $ setFileServerStats s d
|
||||
renameFile f $ f <> ".bak"
|
||||
logInfo "server stats restored"
|
||||
Left e -> do
|
||||
logInfo $ "error restoring server stats: " <> T.pack e
|
||||
liftIO exitFailure
|
||||
|
||||
67
src/Simplex/FileTransfer/Server/Env.hs
Normal file
67
src/Simplex/FileTransfer/Server/Env.hs
Normal file
@@ -0,0 +1,67 @@
|
||||
{-# LANGUAGE DataKinds #-}
|
||||
{-# LANGUAGE DuplicateRecordFields #-}
|
||||
{-# LANGUAGE GADTs #-}
|
||||
{-# LANGUAGE KindSignatures #-}
|
||||
{-# LANGUAGE NamedFieldPuns #-}
|
||||
{-# LANGUAGE StrictData #-}
|
||||
|
||||
module Simplex.FileTransfer.Server.Env where
|
||||
|
||||
import Control.Monad.IO.Unlift
|
||||
import Crypto.Random
|
||||
import Data.List.NonEmpty (NonEmpty)
|
||||
import Data.Time.Clock (getCurrentTime)
|
||||
import Data.X509.Validation (Fingerprint (..))
|
||||
import Network.Socket
|
||||
import qualified Network.TLS as T
|
||||
import Simplex.FileTransfer.Protocol (FileCmd, FileInfo)
|
||||
import Simplex.FileTransfer.Server.Stats
|
||||
import Simplex.FileTransfer.Server.Store
|
||||
import Simplex.FileTransfer.Server.StoreLog
|
||||
import qualified Simplex.Messaging.Crypto as C
|
||||
import Simplex.Messaging.Protocol (RcvPublicVerifyKey)
|
||||
import Simplex.Messaging.Transport.Server (loadFingerprint, loadTLSServerParams)
|
||||
import System.IO (IOMode (..))
|
||||
import UnliftIO.STM
|
||||
|
||||
data XFTPServerConfig = XFTPServerConfig
|
||||
{ xftpPort :: ServiceName,
|
||||
fileIdSize :: Int,
|
||||
storeLogFile :: Maybe FilePath,
|
||||
filesPath :: FilePath,
|
||||
-- CA certificate private key is not needed for initialization
|
||||
caCertificateFile :: FilePath,
|
||||
privateKeyFile :: FilePath,
|
||||
certificateFile :: FilePath,
|
||||
-- stats config - see SMP server config
|
||||
logStatsInterval :: Maybe Int,
|
||||
logStatsStartTime :: Int,
|
||||
serverStatsLogFile :: FilePath,
|
||||
serverStatsBackupFile :: Maybe FilePath,
|
||||
logTLSErrors :: Bool
|
||||
}
|
||||
|
||||
data XFTPEnv = XFTPEnv
|
||||
{ config :: XFTPServerConfig,
|
||||
store :: FileStore,
|
||||
storeLog :: Maybe (StoreLog 'WriteMode),
|
||||
idsDrg :: TVar ChaChaDRG,
|
||||
serverIdentity :: C.KeyHash,
|
||||
tlsServerParams :: T.ServerParams,
|
||||
serverStats :: FileServerStats
|
||||
}
|
||||
|
||||
newXFTPServerEnv :: (MonadUnliftIO m, MonadRandom m) => XFTPServerConfig -> m XFTPEnv
|
||||
newXFTPServerEnv config@XFTPServerConfig {storeLogFile, caCertificateFile, certificateFile, privateKeyFile} = do
|
||||
idsDrg <- drgNew >>= newTVarIO
|
||||
store <- atomically newFileStore
|
||||
storeLog <- liftIO $ mapM (`readWriteFileStore` store) storeLogFile
|
||||
tlsServerParams <- liftIO $ loadTLSServerParams caCertificateFile certificateFile privateKeyFile
|
||||
Fingerprint fp <- liftIO $ loadFingerprint caCertificateFile
|
||||
serverStats <- atomically . newFileServerStats =<< liftIO getCurrentTime
|
||||
pure XFTPEnv {config, store, storeLog, idsDrg, tlsServerParams, serverIdentity = C.KeyHash fp, serverStats}
|
||||
|
||||
data XFTPRequest
|
||||
= XFTPReqNew FileInfo (NonEmpty RcvPublicVerifyKey)
|
||||
| XFTPReqCmd FileRec FileCmd
|
||||
| XFTPReqPing
|
||||
@@ -1 +1,179 @@
|
||||
{-# LANGUAGE DuplicateRecordFields #-}
|
||||
{-# LANGUAGE LambdaCase #-}
|
||||
{-# LANGUAGE NamedFieldPuns #-}
|
||||
{-# LANGUAGE OverloadedLists #-}
|
||||
{-# LANGUAGE OverloadedStrings #-}
|
||||
{-# LANGUAGE PatternSynonyms #-}
|
||||
|
||||
module Simplex.FileTransfer.Server.Main where
|
||||
|
||||
import Data.Either (fromRight)
|
||||
import Data.Functor (($>))
|
||||
import Data.Ini (lookupValue, readIniFile)
|
||||
import Data.Maybe (fromMaybe)
|
||||
import qualified Data.Text as T
|
||||
import Network.Socket (HostName)
|
||||
import Options.Applicative
|
||||
import Simplex.FileTransfer.Server (runXFTPServer)
|
||||
import Simplex.FileTransfer.Server.Env (XFTPServerConfig (..))
|
||||
import qualified Simplex.Messaging.Crypto as C
|
||||
import Simplex.Messaging.Protocol (ProtoServerWithAuth (..), pattern XFTPServer)
|
||||
import Simplex.Messaging.Server.CLI
|
||||
import Simplex.Messaging.Transport.Client (TransportHost (..))
|
||||
import System.Directory (createDirectoryIfMissing, doesFileExist)
|
||||
import System.FilePath (combine)
|
||||
import System.IO (BufferMode (..), hSetBuffering, stderr, stdout)
|
||||
import Text.Read (readMaybe)
|
||||
|
||||
xftpServerVersion :: String
|
||||
xftpServerVersion = "0.1.0"
|
||||
|
||||
xftpServerCLI :: FilePath -> FilePath -> IO ()
|
||||
xftpServerCLI cfgPath logPath = do
|
||||
getCliCommand' (cliCommandP cfgPath logPath iniFile) serverVersion >>= \case
|
||||
Init opts ->
|
||||
doesFileExist iniFile >>= \case
|
||||
True -> exitError $ "Error: server is already initialized (" <> iniFile <> " exists).\nRun `" <> executableName <> " start`."
|
||||
_ -> initializeServer opts
|
||||
Start ->
|
||||
doesFileExist iniFile >>= \case
|
||||
True -> readIniFile iniFile >>= either exitError runServer
|
||||
_ -> exitError $ "Error: server is not initialized (" <> iniFile <> " does not exist).\nRun `" <> executableName <> " init`."
|
||||
Delete -> do
|
||||
confirmOrExit "WARNING: deleting the server will make all queues inaccessible, because the server identity (certificate fingerprint) will change.\nTHIS CANNOT BE UNDONE!"
|
||||
deleteDirIfExists cfgPath
|
||||
deleteDirIfExists logPath
|
||||
putStrLn "Deleted configuration and log files"
|
||||
where
|
||||
iniFile = combine cfgPath "file-server.ini"
|
||||
serverVersion = "SimpleX XFTP server v" <> xftpServerVersion
|
||||
defaultServerPort = "443"
|
||||
executableName = "file-server"
|
||||
storeLogFilePath = combine logPath "file-server-store.log"
|
||||
initializeServer InitOptions {enableStoreLog, signAlgorithm, ip, fqdn, filesPath} = do
|
||||
clearDirIfExists cfgPath
|
||||
clearDirIfExists logPath
|
||||
createDirectoryIfMissing True cfgPath
|
||||
createDirectoryIfMissing True logPath
|
||||
let x509cfg = defaultX509Config {commonName = fromMaybe ip fqdn, signAlgorithm}
|
||||
fp <- createServerX509 cfgPath x509cfg
|
||||
let host = fromMaybe (if ip == "127.0.0.1" then "<hostnames>" else ip) fqdn
|
||||
srv = ProtoServerWithAuth (XFTPServer [THDomainName host] "" (C.KeyHash fp)) Nothing
|
||||
writeFile iniFile $ iniFileContent host
|
||||
putStrLn $ "Server initialized, you can modify configuration in " <> iniFile <> ".\nRun `" <> executableName <> " start` to start server."
|
||||
warnCAPrivateKeyFile cfgPath x509cfg
|
||||
printServiceInfo serverVersion srv
|
||||
where
|
||||
iniFileContent host =
|
||||
"[STORE_LOG]\n\
|
||||
\# The server uses STM memory for persistence,\n\
|
||||
\# that will be lost on restart (e.g., as with redis).\n\
|
||||
\# This option enables saving memory to append only log,\n\
|
||||
\# and restoring it when the server is started.\n\
|
||||
\# Log is compacted on start (deleted objects are removed).\n"
|
||||
<> ("enable: " <> onOff enableStoreLog <> "\n\n")
|
||||
<> "log_stats: off\n\n\
|
||||
\[TRANSPORT]\n\
|
||||
\# host is only used to print server address on start\n"
|
||||
<> ("host: " <> host <> "\n")
|
||||
<> ("port: " <> defaultServerPort <> "\n")
|
||||
<> "log_tls_errors: off\n\n\
|
||||
\[FILES]\n"
|
||||
<> ("path: " <> filesPath <> "\n")
|
||||
runServer ini = do
|
||||
hSetBuffering stdout LineBuffering
|
||||
hSetBuffering stderr LineBuffering
|
||||
fp <- checkSavedFingerprint cfgPath defaultX509Config
|
||||
let host = fromRight "<hostnames>" $ T.unpack <$> lookupValue "TRANSPORT" "host" ini
|
||||
port = T.unpack $ strictIni "TRANSPORT" "port" ini
|
||||
cfg@XFTPServerConfig {xftpPort, storeLogFile} = serverConfig
|
||||
srv = ProtoServerWithAuth (XFTPServer [THDomainName host] (if port == "443" then "" else port) (C.KeyHash fp)) Nothing
|
||||
printServiceInfo serverVersion srv
|
||||
printXFTPConfig xftpPort storeLogFile
|
||||
runXFTPServer cfg
|
||||
where
|
||||
enableStoreLog = settingIsOn "STORE_LOG" "enable" ini
|
||||
logStats = settingIsOn "STORE_LOG" "log_stats" ini
|
||||
c = combine cfgPath . ($ defaultX509Config)
|
||||
printXFTPConfig xftpPort logFile = do
|
||||
putStrLn $ case logFile of
|
||||
Just f -> "Store log: " <> f
|
||||
_ -> "Store log disabled."
|
||||
putStrLn $ "Listening on port " <> xftpPort <> "..."
|
||||
|
||||
serverConfig =
|
||||
XFTPServerConfig
|
||||
{ xftpPort = T.unpack $ strictIni "TRANSPORT" "port" ini,
|
||||
fileIdSize = 16,
|
||||
storeLogFile = enableStoreLog $> storeLogFilePath,
|
||||
filesPath = T.unpack $ strictIni "FILES" "path" ini,
|
||||
caCertificateFile = c caCrtFile,
|
||||
privateKeyFile = c serverKeyFile,
|
||||
certificateFile = c serverCrtFile,
|
||||
logStatsInterval = logStats $> 86400, -- seconds
|
||||
logStatsStartTime = 0, -- seconds from 00:00 UTC
|
||||
serverStatsLogFile = combine logPath "file-server-stats.daily.log",
|
||||
serverStatsBackupFile = logStats $> combine logPath "file-server-stats.log",
|
||||
logTLSErrors = fromMaybe False $ iniOnOff "TRANSPORT" "log_tls_errors" ini
|
||||
}
|
||||
|
||||
data CliCommand
|
||||
= Init InitOptions
|
||||
| Start
|
||||
| Delete
|
||||
|
||||
data InitOptions = InitOptions
|
||||
{ enableStoreLog :: Bool,
|
||||
signAlgorithm :: SignAlgorithm,
|
||||
ip :: HostName,
|
||||
fqdn :: Maybe HostName,
|
||||
filesPath :: FilePath
|
||||
}
|
||||
deriving (Show)
|
||||
|
||||
cliCommandP :: FilePath -> FilePath -> FilePath -> Parser CliCommand
|
||||
cliCommandP cfgPath logPath iniFile =
|
||||
hsubparser
|
||||
( command "init" (info (Init <$> initP) (progDesc $ "Initialize server - creates " <> cfgPath <> " and " <> logPath <> " directories and configuration files"))
|
||||
<> command "start" (info (pure Start) (progDesc $ "Start server (configuration: " <> iniFile <> ")"))
|
||||
<> command "delete" (info (pure Delete) (progDesc "Delete configuration and log files"))
|
||||
)
|
||||
where
|
||||
initP :: Parser InitOptions
|
||||
initP =
|
||||
InitOptions
|
||||
<$> switch
|
||||
( long "store-log"
|
||||
<> short 'l'
|
||||
<> help "Enable store log for persistence"
|
||||
)
|
||||
<*> option
|
||||
(maybeReader readMaybe)
|
||||
( long "sign-algorithm"
|
||||
<> short 'a'
|
||||
<> help "Signature algorithm used for TLS certificates: ED25519, ED448"
|
||||
<> value ED448
|
||||
<> showDefault
|
||||
<> metavar "ALG"
|
||||
)
|
||||
<*> strOption
|
||||
( long "ip"
|
||||
<> help
|
||||
"Server IP address, used as Common Name for TLS online certificate if FQDN is not supplied"
|
||||
<> value "127.0.0.1"
|
||||
<> showDefault
|
||||
<> metavar "IP"
|
||||
)
|
||||
<*> (optional . strOption)
|
||||
( long "fqdn"
|
||||
<> short 'n'
|
||||
<> help "Server FQDN used as Common Name for TLS online certificate"
|
||||
<> showDefault
|
||||
<> metavar "FQDN"
|
||||
)
|
||||
<*> strOption
|
||||
( long "path"
|
||||
<> short 'p'
|
||||
<> help "Path to the directory to store files"
|
||||
<> metavar "PATH"
|
||||
)
|
||||
|
||||
93
src/Simplex/FileTransfer/Server/Stats.hs
Normal file
93
src/Simplex/FileTransfer/Server/Stats.hs
Normal file
@@ -0,0 +1,93 @@
|
||||
{-# LANGUAGE DuplicateRecordFields #-}
|
||||
{-# LANGUAGE NamedFieldPuns #-}
|
||||
{-# LANGUAGE OverloadedStrings #-}
|
||||
|
||||
module Simplex.FileTransfer.Server.Stats where
|
||||
|
||||
import qualified Data.Attoparsec.ByteString.Char8 as A
|
||||
import qualified Data.ByteString.Char8 as B
|
||||
import Data.Time.Clock (UTCTime)
|
||||
import Simplex.Messaging.Encoding.String
|
||||
import Simplex.Messaging.Protocol (SenderId)
|
||||
import Simplex.Messaging.Server.Stats (PeriodStats, PeriodStatsData, getPeriodStatsData, newPeriodStats, setPeriodStats)
|
||||
import UnliftIO.STM
|
||||
|
||||
data FileServerStats = FileServerStats
|
||||
{ fromTime :: TVar UTCTime,
|
||||
filesCreated :: TVar Int,
|
||||
fileRecipients :: TVar Int,
|
||||
filesUploaded :: TVar Int,
|
||||
filesDeleted :: TVar Int,
|
||||
filesDownloaded :: PeriodStats SenderId,
|
||||
fileDownloads :: TVar Int,
|
||||
fileDownloadAcks :: TVar Int
|
||||
}
|
||||
|
||||
data FileServerStatsData = FileServerStatsData
|
||||
{ _fromTime :: UTCTime,
|
||||
_filesCreated :: Int,
|
||||
_fileRecipients :: Int,
|
||||
_filesUploaded :: Int,
|
||||
_filesDeleted :: Int,
|
||||
_filesDownloaded :: PeriodStatsData SenderId,
|
||||
_fileDownloads :: Int,
|
||||
_fileDownloadAcks :: Int
|
||||
}
|
||||
|
||||
newFileServerStats :: UTCTime -> STM FileServerStats
|
||||
newFileServerStats ts = do
|
||||
fromTime <- newTVar ts
|
||||
filesCreated <- newTVar 0
|
||||
fileRecipients <- newTVar 0
|
||||
filesUploaded <- newTVar 0
|
||||
filesDeleted <- newTVar 0
|
||||
filesDownloaded <- newPeriodStats
|
||||
fileDownloads <- newTVar 0
|
||||
fileDownloadAcks <- newTVar 0
|
||||
pure FileServerStats {fromTime, filesCreated, fileRecipients, filesUploaded, filesDeleted, filesDownloaded, fileDownloads, fileDownloadAcks}
|
||||
|
||||
getFileServerStatsData :: FileServerStats -> STM FileServerStatsData
|
||||
getFileServerStatsData s = do
|
||||
_fromTime <- readTVar $ fromTime (s :: FileServerStats)
|
||||
_filesCreated <- readTVar $ filesCreated s
|
||||
_fileRecipients <- readTVar $ fileRecipients s
|
||||
_filesUploaded <- readTVar $ filesUploaded s
|
||||
_filesDeleted <- readTVar $ filesDeleted s
|
||||
_filesDownloaded <- getPeriodStatsData $ filesDownloaded s
|
||||
_fileDownloads <- readTVar $ fileDownloads s
|
||||
_fileDownloadAcks <- readTVar $ fileDownloadAcks s
|
||||
pure FileServerStatsData {_fromTime, _filesCreated, _fileRecipients, _filesUploaded, _filesDeleted, _filesDownloaded, _fileDownloads, _fileDownloadAcks}
|
||||
|
||||
setFileServerStats :: FileServerStats -> FileServerStatsData -> STM ()
|
||||
setFileServerStats s d = do
|
||||
writeTVar (fromTime (s :: FileServerStats)) $! _fromTime (d :: FileServerStatsData)
|
||||
writeTVar (filesCreated s) $! _filesCreated d
|
||||
writeTVar (fileRecipients s) $! _fileRecipients d
|
||||
writeTVar (filesUploaded s) $! _filesUploaded d
|
||||
writeTVar (filesDeleted s) $! _filesDeleted d
|
||||
setPeriodStats (filesDownloaded s) $! _filesDownloaded d
|
||||
writeTVar (fileDownloads s) $! _fileDownloads d
|
||||
writeTVar (fileDownloadAcks s) $! _fileDownloadAcks d
|
||||
|
||||
instance StrEncoding FileServerStatsData where
|
||||
strEncode FileServerStatsData {_fromTime, _filesCreated, _fileRecipients, _filesUploaded, _filesDeleted, _filesDownloaded, _fileDownloads, _fileDownloadAcks} =
|
||||
B.unlines
|
||||
[ "fromTime=" <> strEncode _fromTime,
|
||||
"filesCreated=" <> strEncode _filesCreated,
|
||||
"fileRecipients=" <> strEncode _fileRecipients,
|
||||
"filesUploaded=" <> strEncode _filesUploaded,
|
||||
"filesDeleted=" <> strEncode _filesDeleted,
|
||||
"filesDownloaded=" <> strEncode _filesDownloaded,
|
||||
"fileDownloads=" <> strEncode _fileDownloads,
|
||||
"fileDownloadAcks=" <> strEncode _fileDownloadAcks
|
||||
]
|
||||
strP = do
|
||||
_fromTime <- "fromTime=" *> strP <* A.endOfLine
|
||||
_filesCreated <- "filesCreated=" *> strP <* A.endOfLine
|
||||
_fileRecipients <- "fileRecipients=" *> strP <* A.endOfLine
|
||||
_filesUploaded <- "filesUploaded=" *> strP <* A.endOfLine
|
||||
_filesDeleted <- "filesDeleted=" *> strP <* A.endOfLine
|
||||
_filesDownloaded <- "filesDownloaded=" *> strP <* A.endOfLine
|
||||
_fileDownloads <- "fileDownloads=" *> strP <* A.endOfLine
|
||||
_fileDownloadAcks <- "fileDownloadAcks=" *> strP <* A.endOfLine
|
||||
pure FileServerStatsData {_fromTime, _filesCreated, _fileRecipients, _filesUploaded, _filesDeleted, _filesDownloaded, _fileDownloads, _fileDownloadAcks}
|
||||
@@ -1,9 +1,12 @@
|
||||
{-# LANGUAGE GADTs #-}
|
||||
{-# LANGUAGE LambdaCase #-}
|
||||
{-# LANGUAGE NamedFieldPuns #-}
|
||||
{-# LANGUAGE TupleSections #-}
|
||||
|
||||
module Simplex.FileTransfer.Server.Store
|
||||
( FileStore,
|
||||
newQueueStore,
|
||||
( FileStore (..),
|
||||
FileRec (..),
|
||||
newFileStore,
|
||||
addFile,
|
||||
setFilePath,
|
||||
addRecipient,
|
||||
@@ -17,7 +20,8 @@ import Control.Concurrent.STM
|
||||
import Data.Functor (($>))
|
||||
import Data.Set (Set)
|
||||
import qualified Data.Set as S
|
||||
import Simplex.FileTransfer.Protocol (FileInfo)
|
||||
import Simplex.FileTransfer.Protocol (FileInfo (..), SFileParty (..), XFTPFileId)
|
||||
import qualified Simplex.Messaging.Crypto as C
|
||||
import Simplex.Messaging.Protocol hiding (SParty, SRecipient, SSender)
|
||||
import Simplex.Messaging.TMap (TMap)
|
||||
import qualified Simplex.Messaging.TMap as TM
|
||||
@@ -36,8 +40,8 @@ data FileRec = FileRec
|
||||
}
|
||||
deriving (Eq)
|
||||
|
||||
newQueueStore :: STM FileStore
|
||||
newQueueStore = do
|
||||
newFileStore :: STM FileStore
|
||||
newFileStore = do
|
||||
files <- TM.empty
|
||||
recipients <- TM.empty
|
||||
pure FileStore {files, recipients}
|
||||
@@ -61,7 +65,7 @@ setFilePath st sId fPath =
|
||||
writeTVar filePath (Just fPath) $> Right ()
|
||||
|
||||
addRecipient :: FileStore -> SenderId -> (RecipientId, RcvPublicVerifyKey) -> STM (Either ErrorType ())
|
||||
addRecipient st@FileStore {recipients} senderId recipient@(rId, _) =
|
||||
addRecipient st@FileStore {recipients} senderId (rId, rKey) =
|
||||
withFile st senderId $ \FileRec {recipientIds} -> do
|
||||
rIds <- readTVar recipientIds
|
||||
mem <- TM.member rId recipients
|
||||
@@ -69,7 +73,7 @@ addRecipient st@FileStore {recipients} senderId recipient@(rId, _) =
|
||||
then pure $ Left DUPLICATE_
|
||||
else do
|
||||
writeTVar recipientIds $! S.insert rId rIds
|
||||
TM.insert rId recipient recipients
|
||||
TM.insert rId (senderId, rKey) recipients
|
||||
pure $ Right ()
|
||||
|
||||
deleteFile :: FileStore -> SenderId -> STM (Either ErrorType ())
|
||||
@@ -80,8 +84,13 @@ deleteFile FileStore {files, recipients} senderId = do
|
||||
pure $ Right ()
|
||||
_ -> pure $ Left AUTH
|
||||
|
||||
getFile :: FileStore -> SenderId -> STM (Either ErrorType FileRec)
|
||||
getFile st sId = withFile st sId $ pure . Right
|
||||
getFile :: FileStore -> SFileParty p -> XFTPFileId -> STM (Either ErrorType (FileRec, C.APublicVerifyKey))
|
||||
getFile st party fId = case party of
|
||||
SSender -> withFile st fId $ pure . Right . (\f -> (f, sndKey $ fileInfo f))
|
||||
SRecipient ->
|
||||
TM.lookup fId (recipients st) >>= \case
|
||||
Just (sId, rKey) -> withFile st sId $ pure . Right . (,rKey)
|
||||
_ -> pure $ Left AUTH
|
||||
|
||||
ackFile :: FileStore -> RecipientId -> STM (Either ErrorType ())
|
||||
ackFile st@FileStore {recipients} recipientId = do
|
||||
|
||||
@@ -1 +1,123 @@
|
||||
module Simplex.FileTransfer.Server.StoreLog where
|
||||
{-# LANGUAGE DataKinds #-}
|
||||
{-# LANGUAGE DuplicateRecordFields #-}
|
||||
{-# LANGUAGE GADTs #-}
|
||||
{-# LANGUAGE LambdaCase #-}
|
||||
{-# LANGUAGE NamedFieldPuns #-}
|
||||
{-# LANGUAGE OverloadedStrings #-}
|
||||
|
||||
module Simplex.FileTransfer.Server.StoreLog
|
||||
( StoreLog,
|
||||
FileStoreLogRecord (..),
|
||||
closeStoreLog,
|
||||
readWriteFileStore,
|
||||
logAddFile,
|
||||
logPutFile,
|
||||
logAddRecipients,
|
||||
logDeleteFile,
|
||||
logAckFile,
|
||||
)
|
||||
where
|
||||
|
||||
import Control.Concurrent.STM
|
||||
import Control.Monad.Except
|
||||
import qualified Data.Attoparsec.ByteString.Char8 as A
|
||||
import qualified Data.ByteString.Char8 as B
|
||||
import Data.Composition ((.:))
|
||||
import Data.List.NonEmpty (NonEmpty)
|
||||
import qualified Data.List.NonEmpty as L
|
||||
import Data.Map.Strict (Map)
|
||||
import qualified Data.Map.Strict as M
|
||||
import Simplex.FileTransfer.Protocol (FileInfo (..))
|
||||
import Simplex.FileTransfer.Server.Store
|
||||
import Simplex.Messaging.Encoding.String
|
||||
import Simplex.Messaging.Protocol (RcvPublicVerifyKey, RecipientId, SenderId)
|
||||
import Simplex.Messaging.Server.StoreLog
|
||||
import Simplex.Messaging.Util (bshow, whenM)
|
||||
import System.Directory (doesFileExist, renameFile)
|
||||
import System.IO
|
||||
|
||||
data FileStoreLogRecord
|
||||
= AddFile SenderId FileInfo
|
||||
| PutFile SenderId FilePath
|
||||
| AddRecipients SenderId (NonEmpty (RecipientId, RcvPublicVerifyKey))
|
||||
| DeleteFile SenderId
|
||||
| AckFile RecipientId
|
||||
|
||||
instance StrEncoding FileStoreLogRecord where
|
||||
strEncode = \case
|
||||
AddFile sId file -> strEncode (Str "FNEW", sId, file)
|
||||
PutFile sId path -> strEncode (Str "FPUT", sId, path)
|
||||
AddRecipients sId rcps -> strEncode (Str "FADD", sId, rcps)
|
||||
DeleteFile sId -> strEncode (Str "FDEL", sId)
|
||||
AckFile rId -> strEncode (Str "FACK", rId)
|
||||
strP =
|
||||
A.choice
|
||||
[ "FNEW " *> (AddFile <$> strP_ <*> strP),
|
||||
"FPUT " *> (PutFile <$> strP_ <*> strP),
|
||||
"FADD " *> (AddRecipients <$> strP_ <*> strP),
|
||||
"FDEL " *> (DeleteFile <$> strP),
|
||||
"FACK " *> (AckFile <$> strP)
|
||||
]
|
||||
|
||||
logFileStoreRecord :: StoreLog 'WriteMode -> FileStoreLogRecord -> IO ()
|
||||
logFileStoreRecord = writeStoreLogRecord
|
||||
|
||||
logAddFile :: StoreLog 'WriteMode -> SenderId -> FileInfo -> IO ()
|
||||
logAddFile s = logFileStoreRecord s .: AddFile
|
||||
|
||||
logPutFile :: StoreLog 'WriteMode -> SenderId -> FilePath -> IO ()
|
||||
logPutFile s = logFileStoreRecord s .: PutFile
|
||||
|
||||
logAddRecipients :: StoreLog 'WriteMode -> SenderId -> NonEmpty (RecipientId, RcvPublicVerifyKey) -> IO ()
|
||||
logAddRecipients s = logFileStoreRecord s .: AddRecipients
|
||||
|
||||
logDeleteFile :: StoreLog 'WriteMode -> SenderId -> IO ()
|
||||
logDeleteFile s = logFileStoreRecord s . DeleteFile
|
||||
|
||||
logAckFile :: StoreLog 'WriteMode -> RecipientId -> IO ()
|
||||
logAckFile s = logFileStoreRecord s . AckFile
|
||||
|
||||
readWriteFileStore :: FilePath -> FileStore -> IO (StoreLog 'WriteMode)
|
||||
readWriteFileStore f st = do
|
||||
whenM (doesFileExist f) $ do
|
||||
readFileStore f st
|
||||
renameFile f $ f <> ".bak"
|
||||
s <- openWriteStoreLog f
|
||||
writeFileStore s st
|
||||
pure s
|
||||
|
||||
readFileStore :: FilePath -> FileStore -> IO ()
|
||||
readFileStore f st = mapM_ addFileLogRecord . B.lines =<< B.readFile f
|
||||
where
|
||||
addFileLogRecord s = case strDecode s of
|
||||
Left e -> B.putStrLn $ "Log parsing error (" <> B.pack e <> "): " <> B.take 100 s
|
||||
Right lr ->
|
||||
atomically (addToStore lr) >>= \case
|
||||
Left e -> B.putStrLn $ "Log processing error (" <> bshow e <> "): " <> B.take 100 s
|
||||
_ -> pure ()
|
||||
addToStore = \case
|
||||
AddFile sId file -> addFile st sId file
|
||||
PutFile qId path -> setFilePath st qId path
|
||||
AddRecipients sId rcps -> runExceptT $ addRecipients sId rcps
|
||||
DeleteFile sId -> deleteFile st sId
|
||||
AckFile rId -> ackFile st rId
|
||||
addRecipients sId rcps = mapM_ (ExceptT . addRecipient st sId) rcps
|
||||
|
||||
writeFileStore :: StoreLog 'WriteMode -> FileStore -> IO ()
|
||||
writeFileStore s FileStore {files, recipients} = do
|
||||
allRcps <- readTVarIO recipients
|
||||
readTVarIO files >>= mapM_ (logFile allRcps)
|
||||
where
|
||||
logFile :: Map RecipientId (SenderId, RcvPublicVerifyKey) -> FileRec -> IO ()
|
||||
logFile allRcps FileRec {senderId, fileInfo, filePath, recipientIds} = do
|
||||
logAddFile s senderId fileInfo
|
||||
(rcpErrs, rcps) <- M.mapEither getRcp . M.fromSet id <$> readTVarIO recipientIds
|
||||
mapM_ (logAddRecipients s senderId) $ L.nonEmpty $ M.elems rcps
|
||||
mapM_ (B.putStrLn . ("Error storing log: " <>)) rcpErrs
|
||||
readTVarIO filePath >>= mapM_ (logPutFile s senderId)
|
||||
where
|
||||
getRcp rId = case M.lookup rId allRcps of
|
||||
Just (sndId, rKey)
|
||||
| sndId == senderId -> Right (rId, rKey)
|
||||
| otherwise -> Left $ "sender ID for recipient ID " <> bshow rId <> " does not match FileRec"
|
||||
Nothing -> Left $ "recipient ID " <> bshow rId <> " not found"
|
||||
|
||||
42
src/Simplex/FileTransfer/Transport.hs
Normal file
42
src/Simplex/FileTransfer/Transport.hs
Normal file
@@ -0,0 +1,42 @@
|
||||
{-# LANGUAGE DuplicateRecordFields #-}
|
||||
{-# LANGUAGE LambdaCase #-}
|
||||
{-# LANGUAGE OverloadedStrings #-}
|
||||
{-# LANGUAGE ScopedTypeVariables #-}
|
||||
|
||||
module Simplex.FileTransfer.Transport
|
||||
( supportedFileServerVRange,
|
||||
sendFile,
|
||||
receiveFile,
|
||||
)
|
||||
where
|
||||
|
||||
import Control.Monad.Except
|
||||
import Data.ByteString.Builder (Builder, byteString)
|
||||
import Data.ByteString.Char8 (ByteString)
|
||||
import qualified Data.ByteString.Char8 as B
|
||||
import GHC.IO.Handle.Internals (ioe_EOF)
|
||||
import Simplex.FileTransfer.Protocol (xftpBlockSize)
|
||||
import Simplex.Messaging.Version
|
||||
import System.IO (Handle)
|
||||
|
||||
supportedFileServerVRange :: VersionRange
|
||||
supportedFileServerVRange = mkVersionRange 1 1
|
||||
|
||||
sendFile :: Handle -> (Builder -> IO ()) -> Int -> IO ()
|
||||
sendFile _ _ 0 = pure ()
|
||||
sendFile h send sz = do
|
||||
B.hGet h xftpBlockSize >>= \case
|
||||
"" -> when (sz /= 0) ioe_EOF
|
||||
ch -> do
|
||||
let ch' = B.take sz ch -- sz >= xftpBlockSize
|
||||
send (byteString ch')
|
||||
sendFile h send $ sz - B.length ch'
|
||||
|
||||
-- TODO instead of receiving the whole file this function should stop at size and return error if file is larger
|
||||
receiveFile :: Handle -> (Int -> IO ByteString) -> Int -> IO Int
|
||||
receiveFile h receive sz = do
|
||||
ch <- receive xftpBlockSize
|
||||
let chSize = B.length ch
|
||||
if chSize > 0
|
||||
then B.hPut h ch >> receiveFile h receive (sz + chSize)
|
||||
else pure sz
|
||||
@@ -36,7 +36,7 @@ import Data.Text.Encoding (decodeLatin1, encodeUtf8)
|
||||
import Data.Time.Clock (UTCTime)
|
||||
import Data.Time.Clock.System (SystemTime (..))
|
||||
import Data.Time.Format.ISO8601
|
||||
import Data.Word (Word16)
|
||||
import Data.Word (Word16, Word32)
|
||||
import Simplex.Messaging.Encoding
|
||||
import Simplex.Messaging.Parsers (parseAll)
|
||||
import Simplex.Messaging.Util ((<$?>))
|
||||
@@ -75,6 +75,10 @@ instance StrEncoding Str where
|
||||
strEncode = unStr
|
||||
strP = Str <$> A.takeTill (== ' ') <* optional A.space
|
||||
|
||||
instance StrEncoding FilePath where
|
||||
strEncode = strEncode
|
||||
strDecode = strDecode
|
||||
|
||||
instance ToJSON Str where
|
||||
toJSON (Str s) = strToJSON s
|
||||
toEncoding (Str s) = strToJEncoding s
|
||||
@@ -94,6 +98,12 @@ instance StrEncoding Word16 where
|
||||
strP = A.decimal
|
||||
{-# INLINE strP #-}
|
||||
|
||||
instance StrEncoding Word32 where
|
||||
strEncode = B.pack . show
|
||||
{-# INLINE strEncode #-}
|
||||
strP = A.decimal
|
||||
{-# INLINE strP #-}
|
||||
|
||||
instance StrEncoding Char where
|
||||
strEncode = smpEncode
|
||||
{-# INLINE strEncode #-}
|
||||
|
||||
@@ -343,6 +343,7 @@ apnsPushProviderClient c@APNSPushClient {nonceDrg, apnsCfg} tkn@NtfTknData {toke
|
||||
nonce <- atomically $ C.pseudoRandomCbNonce nonceDrg
|
||||
apnsNtf <- liftEither $ first PPCryptoError $ apnsNotification tkn nonce (paddedNtfLength apnsCfg) pn
|
||||
req <- liftIO $ apnsRequest c tknStr apnsNtf
|
||||
-- TODO if HTTP2 client is thread-safe, we can use sendRequestDirect (the tests pass)
|
||||
HTTP2Response {response, respBody = HTTP2Body {bodyHead}} <- liftHTTPS2 $ sendRequest http2 req
|
||||
let status = H.responseStatus response
|
||||
reason' = maybe "" reason $ J.decodeStrict' bodyHead
|
||||
|
||||
@@ -629,7 +629,7 @@ type XFTPServer = ProtocolServer 'PXFTP
|
||||
pattern XFTPServer :: NonEmpty TransportHost -> ServiceName -> C.KeyHash -> ProtocolServer 'PXFTP
|
||||
pattern XFTPServer host port keyHash = ProtocolServer SPXFTP host port keyHash
|
||||
|
||||
{-# COMPLETE NtfServer #-}
|
||||
{-# COMPLETE XFTPServer #-}
|
||||
|
||||
sameSrvAddr' :: ProtoServerWithAuth p -> ProtoServerWithAuth p -> Bool
|
||||
sameSrvAddr' (ProtoServerWithAuth srv _) (ProtoServerWithAuth srv' _) = sameSrvAddr srv srv'
|
||||
@@ -1217,7 +1217,7 @@ tPut th trs
|
||||
Just ts' -> encodeBatch n' s' ts'
|
||||
_ -> (n', s', Nothing)
|
||||
|
||||
tEncode :: C.CryptoSignature s => (s, ByteString) -> ByteString
|
||||
tEncode :: (Maybe C.ASignature, ByteString) -> ByteString
|
||||
tEncode (sig, t) = smpEncode (C.signatureBytes sig) <> t
|
||||
|
||||
tEncodeBatch :: Int -> ByteString -> ByteString
|
||||
|
||||
@@ -300,7 +300,7 @@ disconnectTransport THandle {connection} c activeAt expCfg = do
|
||||
data VerificationResult = VRVerified (Maybe QueueRec) | VRFailed
|
||||
|
||||
verifyTransmission :: Maybe C.ASignature -> ByteString -> QueueId -> Cmd -> M VerificationResult
|
||||
verifyTransmission sig_ signed queueId cmd = do
|
||||
verifyTransmission sig_ signed queueId cmd =
|
||||
case cmd of
|
||||
Cmd SRecipient (NEW k _ _) -> pure $ Nothing `verified` verifyCmdSignature sig_ signed k
|
||||
Cmd SRecipient _ -> verifyCmd SRecipient $ verifyCmdSignature sig_ signed . recipientKey
|
||||
@@ -311,7 +311,7 @@ verifyTransmission sig_ signed queueId cmd = do
|
||||
verifyCmd :: SParty p -> (QueueRec -> Bool) -> M VerificationResult
|
||||
verifyCmd party f = do
|
||||
st <- asks queueStore
|
||||
q_ <- atomically (getQueue st party queueId)
|
||||
q_ <- atomically $ getQueue st party queueId
|
||||
pure $ case q_ of
|
||||
Right q -> Just q `verified` f q
|
||||
_ -> maybe False (dummyVerifyCmd signed) sig_ `seq` VRFailed
|
||||
|
||||
@@ -303,7 +303,7 @@ transportErrorP =
|
||||
"BLOCK" $> TEBadBlock
|
||||
<|> "LARGE_MSG" $> TELargeMsg
|
||||
<|> "SESSION" $> TEBadSession
|
||||
<|> TEHandshake <$> parseRead1
|
||||
<|> "HANDSHAKE " *> (TEHandshake <$> parseRead1)
|
||||
|
||||
-- | Serialize SMP encrypted transport error.
|
||||
serializeTransportError :: TransportError -> ByteString
|
||||
@@ -311,7 +311,7 @@ serializeTransportError = \case
|
||||
TEBadBlock -> "BLOCK"
|
||||
TELargeMsg -> "LARGE_MSG"
|
||||
TEBadSession -> "SESSION"
|
||||
TEHandshake e -> bshow e
|
||||
TEHandshake e -> "HANDSHAKE " <> bshow e
|
||||
|
||||
-- | Pad and send block to SMP transport.
|
||||
tPutBlock :: Transport c => THandle c -> ByteString -> IO (Either TransportError ())
|
||||
|
||||
@@ -22,7 +22,7 @@ import qualified System.TimeManager as TI
|
||||
defaultHTTP2BufferSize :: BufferSize
|
||||
defaultHTTP2BufferSize = 32768
|
||||
|
||||
withHTTP2 :: BufferSize -> (Config -> SessionId -> IO ()) -> TLS -> IO ()
|
||||
withHTTP2 :: BufferSize -> (Config -> SessionId -> IO a) -> TLS -> IO a
|
||||
withHTTP2 sz run c = E.bracket (allocHTTP2Config c sz) freeSimpleConfig (`run` tlsUniq c)
|
||||
|
||||
allocHTTP2Config :: TLS -> BufferSize -> IO Config
|
||||
@@ -56,25 +56,26 @@ data HTTP2Body = HTTP2Body
|
||||
|
||||
class HTTP2BodyChunk a where
|
||||
getBodyChunk :: a -> IO ByteString
|
||||
getBodeSize :: a -> Maybe Int
|
||||
getBodySize :: a -> Maybe Int
|
||||
|
||||
instance HTTP2BodyChunk HC.Response where
|
||||
getBodyChunk = HC.getResponseBodyChunk
|
||||
{-# INLINE getBodyChunk #-}
|
||||
getBodeSize = HC.responseBodySize
|
||||
{-# INLINE getBodeSize #-}
|
||||
getBodySize = HC.responseBodySize
|
||||
{-# INLINE getBodySize #-}
|
||||
|
||||
instance HTTP2BodyChunk HS.Request where
|
||||
getBodyChunk = HS.getRequestBodyChunk
|
||||
{-# INLINE getBodyChunk #-}
|
||||
getBodeSize = HS.requestBodySize
|
||||
{-# INLINE getBodeSize #-}
|
||||
getBodySize = HS.requestBodySize
|
||||
{-# INLINE getBodySize #-}
|
||||
|
||||
getHTTP2Body :: HTTP2BodyChunk a => a -> Int -> IO HTTP2Body
|
||||
getHTTP2Body r n = do
|
||||
bodyBuffer <- atomically newTBuffer
|
||||
let getPart n' = getBuffered bodyBuffer n' $ getBodyChunk r
|
||||
bodyHead <- getPart n
|
||||
let bodySize = fromMaybe 0 $ getBodeSize r
|
||||
bodyPart = if bodySize > n && B.length bodyHead == n then Just getPart else Nothing
|
||||
let bodySize = fromMaybe 0 $ getBodySize r
|
||||
-- TODO check bodySize once it is set
|
||||
bodyPart = if B.length bodyHead == n then Just getPart else Nothing
|
||||
pure HTTP2Body {bodyHead, bodySize, bodyPart, bodyBuffer}
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
{-# LANGUAGE DuplicateRecordFields #-}
|
||||
{-# LANGUAGE LambdaCase #-}
|
||||
{-# LANGUAGE NamedFieldPuns #-}
|
||||
{-# LANGUAGE OverloadedStrings #-}
|
||||
{-# LANGUAGE ScopedTypeVariables #-}
|
||||
@@ -6,10 +7,11 @@
|
||||
module Simplex.Messaging.Transport.HTTP2.Client where
|
||||
|
||||
import Control.Concurrent.Async
|
||||
import Control.Exception (IOException)
|
||||
import Control.Exception (IOException, try)
|
||||
import qualified Control.Exception as E
|
||||
import Control.Monad.Except
|
||||
import Data.ByteString.Char8 (ByteString)
|
||||
import Data.Functor (($>))
|
||||
import Data.Time (UTCTime, getCurrentTime)
|
||||
import qualified Data.X509.CertificateStore as XS
|
||||
import Network.HPACK (BufferSize)
|
||||
@@ -27,14 +29,16 @@ import UnliftIO.STM
|
||||
import UnliftIO.Timeout
|
||||
|
||||
data HTTP2Client = HTTP2Client
|
||||
{ action :: Maybe (Async ()),
|
||||
{ action :: Maybe (Async HTTP2Response),
|
||||
sessionId :: SessionId,
|
||||
sessionTs :: UTCTime,
|
||||
sendReq :: Request -> (Response -> IO HTTP2Response) -> IO HTTP2Response,
|
||||
client_ :: HClient
|
||||
}
|
||||
|
||||
data HClient = HClient
|
||||
{ connected :: TVar Bool,
|
||||
disconnected :: IO (),
|
||||
host :: TransportHost,
|
||||
port :: ServiceName,
|
||||
config :: HTTP2ClientConfig,
|
||||
@@ -82,7 +86,7 @@ getVerifiedHTTP2Client proxyUsername host port keyHash caStore config@HTTP2Clien
|
||||
mkHTTPS2Client = do
|
||||
connected <- newTVar False
|
||||
reqQ <- newTBQueue $ qSize config
|
||||
pure HClient {connected, host, port, config, reqQ}
|
||||
pure HClient {connected, disconnected, host, port, config, reqQ}
|
||||
|
||||
runClient :: HClient -> IO (Either HTTP2ClientError HTTP2Client)
|
||||
runClient c = do
|
||||
@@ -97,21 +101,23 @@ getVerifiedHTTP2Client proxyUsername host port keyHash caStore config@HTTP2Clien
|
||||
Just (Left e) -> Left e
|
||||
Nothing -> Left HCNetworkError
|
||||
|
||||
client :: HClient -> TMVar (Either HTTP2ClientError HTTP2Client) -> SessionId -> H.Client ()
|
||||
client :: HClient -> TMVar (Either HTTP2ClientError HTTP2Client) -> SessionId -> H.Client HTTP2Response
|
||||
client c cVar sessionId sendReq = do
|
||||
sessionTs <- getCurrentTime
|
||||
let c' = HTTP2Client {action = Nothing, client_ = c, sessionId, sessionTs}
|
||||
let c' = HTTP2Client {action = Nothing, client_ = c, sendReq, sessionId, sessionTs}
|
||||
atomically $ do
|
||||
writeTVar (connected c) True
|
||||
putTMVar cVar (Right c')
|
||||
process c' sendReq `E.finally` disconnected
|
||||
|
||||
process :: HTTP2Client -> H.Client ()
|
||||
process :: HTTP2Client -> H.Client HTTP2Response
|
||||
process HTTP2Client {client_ = HClient {reqQ}} sendReq = forever $ do
|
||||
(req, respVar) <- atomically $ readTBQueue reqQ
|
||||
sendReq req $ \r -> do
|
||||
respBody <- getHTTP2Body r bodyHeadSize
|
||||
atomically $ putTMVar respVar HTTP2Response {response = r, respBody}
|
||||
let resp = HTTP2Response {response = r, respBody}
|
||||
atomically $ putTMVar respVar resp
|
||||
pure resp
|
||||
|
||||
-- | Disconnects client from the server and terminates client threads.
|
||||
closeHTTP2Client :: HTTP2Client -> IO ()
|
||||
@@ -123,8 +129,20 @@ sendRequest HTTP2Client {client_ = HClient {config, reqQ}} req = do
|
||||
atomically $ writeTBQueue reqQ (req, resp)
|
||||
maybe (Left HCResponseTimeout) Right <$> (connTimeout config `timeout` atomically (takeTMVar resp))
|
||||
|
||||
runHTTP2Client :: T.Supported -> Maybe XS.CertificateStore -> TransportClientConfig -> BufferSize -> Maybe ByteString -> TransportHost -> ServiceName -> Maybe C.KeyHash -> (SessionId -> H.Client ()) -> IO ()
|
||||
sendRequestDirect :: HTTP2Client -> Request -> IO (Either HTTP2ClientError HTTP2Response)
|
||||
sendRequestDirect HTTP2Client {client_ = HClient {config, disconnected}, sendReq} req =
|
||||
connTimeout config `timeout` try (sendReq req process) >>= \case
|
||||
Just (Right r) -> pure $ Right r
|
||||
Just (Left e) -> disconnected $> Left (HCIOError e)
|
||||
Nothing -> pure $ Left HCNetworkError
|
||||
where
|
||||
process r = do
|
||||
respBody <- getHTTP2Body r $ bodyHeadSize config
|
||||
pure HTTP2Response {response = r, respBody}
|
||||
|
||||
runHTTP2Client :: forall a. T.Supported -> Maybe XS.CertificateStore -> TransportClientConfig -> BufferSize -> Maybe ByteString -> TransportHost -> ServiceName -> Maybe C.KeyHash -> (SessionId -> H.Client a) -> IO a
|
||||
runHTTP2Client tlsParams caStore tcConfig bufferSize proxyUsername host port keyHash client =
|
||||
runTLSTransportClient tlsParams caStore tcConfig proxyUsername host port keyHash $ withHTTP2 bufferSize run
|
||||
where
|
||||
run :: H.Config -> SessionId -> IO a
|
||||
run cfg = H.run (ClientConfig "https" (strEncode host) 20) cfg . client
|
||||
|
||||
@@ -85,7 +85,7 @@ agentCfgRatchetV1 = agentCfg {e2eEncryptVRange = vr11}
|
||||
vr11 :: VersionRange
|
||||
vr11 = mkVersionRange 1 1
|
||||
|
||||
runRight_ :: HasCallStack => ExceptT AgentErrorType IO () -> Expectation
|
||||
runRight_ :: (Eq e, Show e, HasCallStack) => ExceptT e IO () -> Expectation
|
||||
runRight_ action = runExceptT action `shouldReturn` Right ()
|
||||
|
||||
runRight :: HasCallStack => ExceptT AgentErrorType IO a -> IO a
|
||||
|
||||
@@ -3,6 +3,7 @@
|
||||
{-# LANGUAGE NamedFieldPuns #-}
|
||||
{-# LANGUAGE OverloadedStrings #-}
|
||||
{-# LANGUAGE PatternSynonyms #-}
|
||||
{-# LANGUAGE TypeApplications #-}
|
||||
{-# OPTIONS_GHC -fno-warn-incomplete-uni-patterns #-}
|
||||
|
||||
module AgentTests.NotificationTests where
|
||||
@@ -464,7 +465,7 @@ testNotificationsSMPRestart t APNSMockServer {apnsQ} = do
|
||||
liftIO $ killThread threadId
|
||||
pure (aliceId, bobId)
|
||||
|
||||
runRight_ $ do
|
||||
runRight_ @AgentErrorType $ do
|
||||
get alice =##> \case ("", "", DOWN _ [c]) -> c == bobId; _ -> False
|
||||
get bob =##> \case ("", "", DOWN _ [c]) -> c == aliceId; _ -> False
|
||||
|
||||
|
||||
@@ -4,6 +4,7 @@ module CLITests where
|
||||
|
||||
import Data.Ini (lookupValue, readIniFile)
|
||||
import Data.List (isPrefixOf)
|
||||
import Simplex.FileTransfer.Server.Main (xftpServerCLI, xftpServerVersion)
|
||||
import Simplex.Messaging.Notifications.Server.Main
|
||||
import Simplex.Messaging.Server.Main
|
||||
import Simplex.Messaging.Transport (simplexMQVersion)
|
||||
@@ -27,6 +28,12 @@ ntfCfgPath = "tests/tmp/cli/etc/opt/simplex-notifications"
|
||||
ntfLogPath :: FilePath
|
||||
ntfLogPath = "tests/tmp/cli/etc/var/simplex-notifications"
|
||||
|
||||
fileCfgPath :: FilePath
|
||||
fileCfgPath = "tests/tmp/cli/etc/opt/simplex-files"
|
||||
|
||||
fileLogPath :: FilePath
|
||||
fileLogPath = "tests/tmp/cli/etc/var/simplex-files"
|
||||
|
||||
cliTests :: Spec
|
||||
cliTests = do
|
||||
describe "SMP server CLI" $ do
|
||||
@@ -38,6 +45,9 @@ cliTests = do
|
||||
describe "Ntf server CLI" $ do
|
||||
it "should initialize, start and delete the server (no store log)" $ ntfServerTest False
|
||||
it "should initialize, start and delete the server (with store log)" $ ntfServerTest True
|
||||
describe "XFTP server CLI" $ do
|
||||
it "should initialize, start and delete the server (no store log)" $ xftpServerTest False
|
||||
it "should initialize, start and delete the server (with store log)" $ xftpServerTest True
|
||||
|
||||
smpServerTest :: Bool -> Bool -> IO ()
|
||||
smpServerTest storeLog basicAuth = do
|
||||
@@ -78,3 +88,20 @@ ntfServerTest storeLog = do
|
||||
capture_ (withStdin "Y" . withArgs ["delete"] $ ntfServerCLI ntfCfgPath ntfLogPath)
|
||||
>>= (`shouldSatisfy` ("WARNING: deleting the server will make all queues inaccessible" `isPrefixOf`))
|
||||
doesFileExist (cfgPath <> "/ca.key") `shouldReturn` False
|
||||
|
||||
xftpServerTest :: Bool -> IO ()
|
||||
xftpServerTest storeLog = do
|
||||
capture_ (withArgs (["init", "-p tests/tmp"] <> ["-l" | storeLog]) $ xftpServerCLI fileCfgPath fileLogPath)
|
||||
>>= (`shouldSatisfy` (("Server initialized, you can modify configuration in " <> fileCfgPath <> "/file-server.ini") `isPrefixOf`))
|
||||
Right ini <- readIniFile $ fileCfgPath <> "/file-server.ini"
|
||||
lookupValue "STORE_LOG" "enable" ini `shouldBe` Right (if storeLog then "on" else "off")
|
||||
lookupValue "STORE_LOG" "log_stats" ini `shouldBe` Right "off"
|
||||
lookupValue "TRANSPORT" "port" ini `shouldBe` Right "443"
|
||||
doesFileExist (fileCfgPath <> "/ca.key") `shouldReturn` True
|
||||
r <- lines <$> capture_ (withArgs ["start"] $ (100000 `timeout` xftpServerCLI fileCfgPath fileLogPath) `catchAll_` pure (Just ()))
|
||||
r `shouldContain` ["SimpleX XFTP server v" <> xftpServerVersion]
|
||||
r `shouldContain` (if storeLog then ["Store log: " <> fileLogPath <> "/file-server-store.log"] else ["Store log disabled."])
|
||||
r `shouldContain` ["Listening on port 443..."]
|
||||
capture_ (withStdin "Y" . withArgs ["delete"] $ xftpServerCLI fileCfgPath fileLogPath)
|
||||
>>= (`shouldSatisfy` ("WARNING: deleting the server will make all queues inaccessible" `isPrefixOf`))
|
||||
doesFileExist (cfgPath <> "/ca.key") `shouldReturn` False
|
||||
|
||||
@@ -30,6 +30,7 @@ import Network.HTTP.Types (Status)
|
||||
import qualified Network.HTTP.Types as N
|
||||
import qualified Network.HTTP2.Server as H
|
||||
import Network.Socket
|
||||
import SMPClient (serverBracket)
|
||||
import Simplex.Messaging.Client (chooseTransportHost, defaultNetworkConfig)
|
||||
import Simplex.Messaging.Client.Agent (defaultSMPClientAgentConfig)
|
||||
import qualified Simplex.Messaging.Crypto as C
|
||||
@@ -48,7 +49,6 @@ import UnliftIO.Async
|
||||
import UnliftIO.Concurrent
|
||||
import qualified UnliftIO.Exception as E
|
||||
import UnliftIO.STM
|
||||
import UnliftIO.Timeout (timeout)
|
||||
|
||||
testHost :: NonEmpty TransportHost
|
||||
testHost = "localhost"
|
||||
@@ -115,19 +115,6 @@ withNtfServerCfg t cfg =
|
||||
(\started -> runNtfServerBlocking started cfg {transports = [(ntfTestPort, t)]})
|
||||
(pure ())
|
||||
|
||||
serverBracket :: MonadUnliftIO m => (TMVar Bool -> m ()) -> m () -> (ThreadId -> m a) -> m a
|
||||
serverBracket process afterProcess f = do
|
||||
started <- newEmptyTMVarIO
|
||||
E.bracket
|
||||
(forkIOWithUnmask ($ process started))
|
||||
(\t -> killThread t >> afterProcess >> waitFor started "stop")
|
||||
(\t -> waitFor started "start" >> f t)
|
||||
where
|
||||
waitFor started s =
|
||||
5_000_000 `timeout` atomically (takeTMVar started) >>= \case
|
||||
Nothing -> error $ "server did not " <> s
|
||||
_ -> pure ()
|
||||
|
||||
withNtfServerOn :: ATransport -> ServiceName -> IO a -> IO a
|
||||
withNtfServerOn t port' = withNtfServerThreadOn t port' . const
|
||||
|
||||
|
||||
@@ -16,6 +16,7 @@ import Simplex.Messaging.Transport.WebSockets (WS)
|
||||
import System.Directory (createDirectoryIfMissing, removeDirectoryRecursive)
|
||||
import System.Environment (setEnv)
|
||||
import Test.Hspec
|
||||
import XFTPServerTests (xftpServerTests)
|
||||
|
||||
logCfg :: LogConfig
|
||||
logCfg = LogConfig {lc_file = Nothing, lc_stderr = True}
|
||||
@@ -40,5 +41,7 @@ main = do
|
||||
describe "SMP server via WebSockets" $ serverTests (transport @WS)
|
||||
describe "Notifications server" $ ntfServerTests (transport @TLS)
|
||||
describe "SMP client agent" $ agentTests (transport @TLS)
|
||||
describe "XFTP" $ do
|
||||
describe "XFTP server" xftpServerTests
|
||||
describe "XFTP file description" fileDescriptionTests
|
||||
describe "Server CLIs" cliTests
|
||||
describe "File description" fileDescriptionTests
|
||||
|
||||
61
tests/XFTPClient.hs
Normal file
61
tests/XFTPClient.hs
Normal file
@@ -0,0 +1,61 @@
|
||||
{-# LANGUAGE LambdaCase #-}
|
||||
{-# LANGUAGE OverloadedStrings #-}
|
||||
{-# LANGUAGE RankNTypes #-}
|
||||
|
||||
module XFTPClient where
|
||||
|
||||
import Control.Concurrent (ThreadId)
|
||||
import Network.Socket (ServiceName)
|
||||
import SMPClient (serverBracket)
|
||||
import Simplex.FileTransfer.Client
|
||||
import Simplex.FileTransfer.Server (runXFTPServerBlocking)
|
||||
import Simplex.FileTransfer.Server.Env (XFTPServerConfig (..))
|
||||
import Simplex.Messaging.Protocol (XFTPServer)
|
||||
import Test.Hspec
|
||||
|
||||
xftpTest :: HasCallStack => (HasCallStack => XFTPClient -> IO ()) -> Expectation
|
||||
xftpTest test = runXFTPTest test `shouldReturn` ()
|
||||
|
||||
runXFTPTest :: HasCallStack => (HasCallStack => XFTPClient -> IO a) -> IO a
|
||||
runXFTPTest test = withXFTPServer $ testXFTPClient test
|
||||
|
||||
withXFTPServerCfg :: HasCallStack => XFTPServerConfig -> (HasCallStack => ThreadId -> IO a) -> IO a
|
||||
withXFTPServerCfg cfg =
|
||||
serverBracket
|
||||
(`runXFTPServerBlocking` cfg)
|
||||
(pure ())
|
||||
|
||||
withXFTPServer :: IO a -> IO a
|
||||
withXFTPServer = withXFTPServerCfg testXFTPServerConfig . const
|
||||
|
||||
xftpTestPort :: ServiceName
|
||||
xftpTestPort = "7000"
|
||||
|
||||
testXFTPServer :: XFTPServer
|
||||
testXFTPServer = "xftp://LcJUMfVhwD8yxjAiSaDzzGF3-kLG4Uh0Fl_ZIjrRwjI=@localhost:7000"
|
||||
|
||||
testXFTPServerConfig :: XFTPServerConfig
|
||||
testXFTPServerConfig =
|
||||
XFTPServerConfig
|
||||
{ xftpPort = xftpTestPort,
|
||||
fileIdSize = 16,
|
||||
storeLogFile = Nothing,
|
||||
filesPath = "tests/xftp-files",
|
||||
caCertificateFile = "tests/fixtures/ca.crt",
|
||||
privateKeyFile = "tests/fixtures/server.key",
|
||||
certificateFile = "tests/fixtures/server.crt",
|
||||
logStatsInterval = Nothing,
|
||||
logStatsStartTime = 0,
|
||||
serverStatsLogFile = "tests/xftp-server-stats.daily.log",
|
||||
serverStatsBackupFile = Nothing,
|
||||
logTLSErrors = True
|
||||
}
|
||||
|
||||
testXFTPClientConfig :: XFTPClientConfig
|
||||
testXFTPClientConfig = defaultXFTPClientConfig
|
||||
|
||||
testXFTPClient :: HasCallStack => (HasCallStack => XFTPClient -> IO a) -> IO a
|
||||
testXFTPClient client =
|
||||
getXFTPClient (1, testXFTPServer, Nothing) testXFTPClientConfig (pure ()) >>= \case
|
||||
Right c -> client c
|
||||
Left e -> error $ show e
|
||||
57
tests/XFTPServerTests.hs
Normal file
57
tests/XFTPServerTests.hs
Normal file
@@ -0,0 +1,57 @@
|
||||
{-# LANGUAGE DuplicateRecordFields #-}
|
||||
{-# LANGUAGE NamedFieldPuns #-}
|
||||
{-# LANGUAGE OverloadedLists #-}
|
||||
{-# LANGUAGE OverloadedStrings #-}
|
||||
|
||||
module XFTPServerTests where
|
||||
|
||||
import AgentTests.FunctionalAPITests (runRight_)
|
||||
import Control.Monad.IO.Class (liftIO)
|
||||
import Crypto.Random (getRandomBytes)
|
||||
import qualified Data.ByteString.Base64.URL as B64
|
||||
import Data.ByteString.Char8 (ByteString)
|
||||
import qualified Data.ByteString.Char8 as B
|
||||
import Simplex.FileTransfer.Client
|
||||
import Simplex.FileTransfer.Protocol (FileInfo (..))
|
||||
import qualified Simplex.Messaging.Crypto as C
|
||||
import Simplex.Messaging.Protocol (SenderId)
|
||||
import System.Directory (createDirectoryIfMissing, removeDirectoryRecursive)
|
||||
import System.IO (IOMode (..), withFile)
|
||||
import Test.Hspec
|
||||
import XFTPClient
|
||||
|
||||
xftpServerTests :: Spec
|
||||
xftpServerTests =
|
||||
before_ (createDirectoryIfMissing False "tests/xftp-files")
|
||||
. after_ (removeDirectoryRecursive "tests/xftp-files")
|
||||
$ do
|
||||
describe "XFTP file chunk delivery" testFileChunkDelivery
|
||||
|
||||
chSize :: Num n => n
|
||||
chSize = 256 * 1024
|
||||
|
||||
createTestChunk :: FilePath -> IO ByteString
|
||||
createTestChunk fp = do
|
||||
bytes <- getRandomBytes chSize
|
||||
withFile fp WriteMode $ \h -> B.hPut h bytes
|
||||
pure bytes
|
||||
|
||||
readChunk :: SenderId -> IO ByteString
|
||||
readChunk sId = B.readFile ("tests/xftp-files/" <> B.unpack (B64.encode sId))
|
||||
|
||||
testFileChunkDelivery :: Spec
|
||||
testFileChunkDelivery =
|
||||
it "should create, upload and receive file chunk" $ do
|
||||
(sndKey, spKey) <- C.generateSignatureKeyPair C.SEd25519
|
||||
(rcvKey, rpKey) <- C.generateSignatureKeyPair C.SEd25519
|
||||
(rDhKey, _rpDhKey) <- C.generateKeyPair'
|
||||
bytes <- createTestChunk "tests/tmp/chunk1"
|
||||
xftpTest $ \c -> runRight_ $ do
|
||||
let file = FileInfo {sndKey, size = chSize, digest = "abc="}
|
||||
(sId, [rId]) <- createXFTPChunk c spKey file [rcvKey]
|
||||
uploadXFTPChunk c spKey sId $ XFTPChunkSpec {filePath = "tests/tmp/chunk1", chunkOffset = 0, chunkSize = chSize}
|
||||
liftIO $ readChunk sId `shouldReturn` bytes
|
||||
(_sDhKey, chunkBody) <- downloadXFTPChunk c rpKey rId rDhKey
|
||||
receiveXFTPChunk chunkBody XFTPChunkSpec {filePath = "tests/tmp/received_chunk1", chunkOffset = 0, chunkSize = chSize}
|
||||
liftIO $ B.readFile "tests/tmp/received_chunk1" `shouldReturn` bytes
|
||||
pure ()
|
||||
Reference in New Issue
Block a user