diff --git a/apps/smp-agent/Main.hs b/apps/smp-agent/Main.hs index 0c51b5619..ce73ccbea 100644 --- a/apps/smp-agent/Main.hs +++ b/apps/smp-agent/Main.hs @@ -21,6 +21,7 @@ servers = InitialAgentServers { smp = M.fromList [(1, L.fromList ["smp://bU0K-bRg24xWW__lS0umO1Zdw_SXqpJNtm1_RrPLViE=@localhost:5223"])], ntf = [], + xftp = M.fromList [], netCfg = defaultNetworkConfig } diff --git a/simplexmq.cabal b/simplexmq.cabal index f88530698..87a5fe16c 100644 --- a/simplexmq.cabal +++ b/simplexmq.cabal @@ -35,6 +35,7 @@ flag swift library exposed-modules: Simplex.FileTransfer + Simplex.FileTransfer.Agent Simplex.FileTransfer.Client Simplex.FileTransfer.Client.Agent Simplex.FileTransfer.Client.Main @@ -47,6 +48,8 @@ library Simplex.FileTransfer.Server.Store Simplex.FileTransfer.Server.StoreLog Simplex.FileTransfer.Transport + Simplex.FileTransfer.Types + Simplex.FileTransfer.Util Simplex.Messaging.Agent Simplex.Messaging.Agent.Client Simplex.Messaging.Agent.Env.SQLite @@ -72,6 +75,7 @@ library Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230117_fkey_indexes Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230120_delete_errors Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230217_server_key_hash + Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230223_files Simplex.Messaging.Agent.TAsyncs Simplex.Messaging.Agent.TRcvQueues Simplex.Messaging.Client @@ -224,6 +228,7 @@ executable ntf-server , sqlcipher-simple ==0.4.* , stm ==2.5.* , template-haskell ==2.16.* + , temporary ==1.3.* , text ==1.2.* , time ==1.9.* , time-compat ==1.9.* @@ -287,6 +292,7 @@ executable smp-agent , sqlcipher-simple ==0.4.* , stm ==2.5.* , template-haskell ==2.16.* + , temporary ==1.3.* , text ==1.2.* , time ==1.9.* , time-compat ==1.9.* @@ -350,6 +356,7 @@ executable smp-server , sqlcipher-simple ==0.4.* , stm ==2.5.* , template-haskell ==2.16.* + , temporary ==1.3.* , text ==1.2.* , time ==1.9.* , time-compat ==1.9.* @@ -477,6 +484,7 @@ executable xftp-server , sqlcipher-simple ==0.4.* , stm ==2.5.* , template-haskell ==2.16.* + , temporary ==1.3.* , text ==1.2.* , time ==1.9.* , time-compat ==1.9.* @@ -517,6 +525,7 @@ test-suite simplexmq-test ServerTests SMPAgentClient SMPClient + XFTPAgent XFTPCLI XFTPClient XFTPServerTests @@ -568,6 +577,7 @@ test-suite simplexmq-test , sqlcipher-simple ==0.4.* , stm ==2.5.* , template-haskell ==2.16.* + , temporary ==1.3.* , text ==1.2.* , time ==1.9.* , time-compat ==1.9.* diff --git a/src/Simplex/FileTransfer/Agent.hs b/src/Simplex/FileTransfer/Agent.hs new file mode 100644 index 000000000..a989c20fe --- /dev/null +++ b/src/Simplex/FileTransfer/Agent.hs @@ -0,0 +1,182 @@ +{-# LANGUAGE ConstraintKinds #-} +{-# LANGUAGE DataKinds #-} +{-# LANGUAGE DuplicateRecordFields #-} +{-# LANGUAGE FlexibleContexts #-} +{-# LANGUAGE LambdaCase #-} +{-# LANGUAGE NamedFieldPuns #-} +{-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE RankNTypes #-} +{-# LANGUAGE ScopedTypeVariables #-} + +module Simplex.FileTransfer.Agent + ( receiveFile, + ) +where + +import Control.Monad +import Control.Monad.Except +import Control.Monad.Reader +import qualified Data.Attoparsec.ByteString.Char8 as A +import Data.Bifunctor (first) +import qualified Data.ByteString.Lazy.Char8 as LB +import Data.Int (Int64) +import Simplex.FileTransfer.Description +import Simplex.FileTransfer.Protocol (FileParty (..)) +import Simplex.FileTransfer.Transport (XFTPRcvChunkSpec (..)) +import Simplex.FileTransfer.Types +import Simplex.FileTransfer.Util (uniqueCombine) +import Simplex.Messaging.Agent.Client +import Simplex.Messaging.Agent.Env.SQLite +import Simplex.Messaging.Agent.Protocol (ACommand (FRCVD), AParty (..), AgentErrorType (INTERNAL)) +import Simplex.Messaging.Agent.RetryInterval +import Simplex.Messaging.Agent.Store +import Simplex.Messaging.Agent.Store.SQLite +import qualified Simplex.Messaging.Crypto.Lazy as LC +import Simplex.Messaging.Encoding +import Simplex.Messaging.Protocol (XFTPServer) +import qualified Simplex.Messaging.TMap as TM +import Simplex.Messaging.Util (whenM) +import UnliftIO +import UnliftIO.Directory +import qualified UnliftIO.Exception as E + +receiveFile :: AgentMonad m => AgentClient -> UserId -> ValidFileDescription 'FPRecipient -> FilePath -> m Int64 +receiveFile c userId (ValidFileDescription fd@FileDescription {chunks}) xftpPath = do + encPath <- uniqueCombine xftpPath "xftp.encrypted" + createDirectory encPath + fId <- withStore' c $ \db -> createRcvFile db userId fd xftpPath encPath + forM_ chunks downloadChunk + pure fId + where + downloadChunk :: AgentMonad m => FileChunk -> m () + downloadChunk FileChunk {replicas = (FileChunkReplica {server} : _)} = do + addWorker c (Just server) + downloadChunk _ = throwError $ INTERNAL "no replicas" + +addWorker :: AgentMonad m => AgentClient -> Maybe XFTPServer -> m () +addWorker c srv_ = do + ws <- asks $ xftpWorkers . xftpAgent + atomically (TM.lookup srv_ ws) >>= \case + Nothing -> do + doWork <- newTMVarIO () + let runWorker = case srv_ of + Just srv -> runXFTPWorker c srv doWork + Nothing -> runXFTPLocalWorker c doWork + worker <- async $ runWorker `E.finally` atomically (TM.delete srv_ ws) + atomically $ TM.insert srv_ (doWork, worker) ws + Just (doWork, _) -> + void . atomically $ tryPutTMVar doWork () + +runXFTPWorker :: forall m. AgentMonad m => AgentClient -> XFTPServer -> TMVar () -> m () +runXFTPWorker c srv doWork = do + forever $ do + void . atomically $ readTMVar doWork + agentOperationBracket c AORcvNetwork throwWhenInactive runXftpOperation + where + runXftpOperation :: m () + runXftpOperation = do + nextChunk <- withStore' c (`getNextRcvChunkToDownload` srv) + case nextChunk of + Nothing -> noWorkToDo + Just fc@RcvFileChunk {nextDelay} -> do + ri <- asks $ reconnectInterval . config + let ri' = maybe ri (\d -> ri {initialInterval = d}) nextDelay + withRetryInterval ri' $ \loop -> + downloadFileChunk fc + `catchError` \e -> do + liftIO $ print e + -- TODO don't loop on permanent errors + -- TODO increase replica retries count + -- TODO update nextDelay (modify withRetryInterval to expose current delay) + loop + noWorkToDo = void . atomically $ tryTakeTMVar doWork + downloadFileChunk :: RcvFileChunk -> m () + downloadFileChunk RcvFileChunk {userId, rcvFileId, rcvChunkId, chunkNo, chunkSize, digest, fileTmpPath, replicas = replica : _} = do + chunkPath <- uniqueCombine fileTmpPath $ show chunkNo + let chunkSpec = XFTPRcvChunkSpec chunkPath (unFileSize chunkSize) (unFileDigest digest) + agentXFTPDownloadChunk c userId replica chunkSpec + fileReceived <- withStore c $ \db -> runExceptT $ do + -- both actions can be done in a single store method + fd <- ExceptT $ updateRcvFileChunkReceived db (rcvChunkReplicaId replica) rcvChunkId rcvFileId chunkPath + let fileReceived = allChunksReceived fd + when fileReceived $ + liftIO $ updateRcvFileStatus db rcvFileId RFSReceived + pure fileReceived + -- check if chunk is downloaded and not acknowledged via flag acknowledged? + -- or just catch and ignore error on acknowledgement? (and remove flag) + -- agentXFTPAckChunk c replicaKey (unChunkReplicaId replicaId) `catchError` \_ -> pure () + when fileReceived $ addWorker c Nothing + where + allChunksReceived :: RcvFile -> Bool + allChunksReceived RcvFile {chunks} = + all (\RcvFileChunk {replicas} -> any received replicas) chunks + downloadFileChunk _ = throwError $ INTERNAL "no replica" + +runXFTPLocalWorker :: forall m. AgentMonad m => AgentClient -> TMVar () -> m () +runXFTPLocalWorker c@AgentClient {subQ} doWork = do + forever $ do + void . atomically $ readTMVar doWork + runXftpOperation + where + runXftpOperation :: m () + runXftpOperation = do + nextFile <- withStore' c getNextRcvFileToDecrypt + case nextFile of + Nothing -> noWorkToDo + Just fd -> do + ri <- asks $ reconnectInterval . config + withRetryInterval ri $ \loop -> + decryptFile fd + `catchError` \e -> do + liftIO $ print e + -- TODO don't loop on permanent errors + -- TODO fixed number of retries instead of exponential backoff? + loop + noWorkToDo = void . atomically $ tryTakeTMVar doWork + decryptFile :: RcvFile -> m () + decryptFile RcvFile {rcvFileId, key, nonce, tmpPath, saveDir, chunks} = do + -- TODO remove tmpPath if exists + withStore' c $ \db -> updateRcvFileStatus db rcvFileId RFSDecrypting + chunkPaths <- getChunkPaths chunks + encSize <- liftIO $ foldM (\s path -> (s +) . fromIntegral <$> getFileSize path) 0 chunkPaths + path <- decrypt encSize chunkPaths + whenM (doesPathExist tmpPath) $ removeDirectoryRecursive tmpPath + withStore' c $ \db -> updateRcvFileComplete db rcvFileId path + notify $ FRCVD rcvFileId path + where + notify :: ACommand 'Agent -> m () + notify cmd = atomically $ writeTBQueue subQ ("", "", cmd) + getChunkPaths :: [RcvFileChunk] -> m [FilePath] + getChunkPaths [] = pure [] + getChunkPaths (RcvFileChunk {chunkTmpPath = Just path} : cs) = do + ps <- getChunkPaths cs + pure $ path : ps + getChunkPaths (RcvFileChunk {chunkTmpPath = Nothing} : _cs) = + throwError $ INTERNAL "no chunk path" + decrypt :: Int64 -> [FilePath] -> m FilePath + decrypt encSize chunkPaths = do + lazyChunks <- readChunks chunkPaths + (authOk, f) <- liftEither . first cryptoError $ LC.sbDecryptTailTag key nonce (encSize - authTagSize) lazyChunks + let (fileHdr, f') = LB.splitAt 1024 f + -- withFile encPath ReadMode $ \r -> do + -- fileHdr <- liftIO $ B.hGet r 1024 + case A.parse smpP $ LB.toStrict fileHdr of + -- TODO XFTP errors + A.Fail _ _ e -> throwError $ INTERNAL $ "Invalid file header: " <> e + A.Partial _ -> throwError $ INTERNAL "Invalid file header" + A.Done rest FileHeader {fileName} -> do + -- TODO touch file in agent bracket + path <- uniqueCombine saveDir fileName + liftIO $ LB.writeFile path $ LB.fromStrict rest <> f' + unless authOk $ do + removeFile path + throwError $ INTERNAL "Error decrypting file: incorrect auth tag" + pure path + readChunks :: [FilePath] -> m LB.ByteString + readChunks = + foldM + ( \s path -> do + chunk <- liftIO $ LB.readFile path + pure $ s <> chunk + ) + LB.empty diff --git a/src/Simplex/FileTransfer/Client.hs b/src/Simplex/FileTransfer/Client.hs index ff130b20e..124f433b5 100644 --- a/src/Simplex/FileTransfer/Client.hs +++ b/src/Simplex/FileTransfer/Client.hs @@ -16,6 +16,7 @@ import Data.ByteString.Char8 (ByteString) import qualified Data.ByteString.Char8 as B import Data.Int (Int64) import Data.List.NonEmpty (NonEmpty (..)) +import Data.Time (UTCTime) import Data.Word (Word32) import qualified Network.HTTP.Types as N import qualified Network.HTTP2.Client as H @@ -34,6 +35,7 @@ import Simplex.Messaging.Client import Simplex.Messaging.Client.Agent () import qualified Simplex.Messaging.Crypto as C import qualified Simplex.Messaging.Crypto.Lazy as LC +import Simplex.Messaging.Encoding.String import Simplex.Messaging.Protocol ( BasicAuth, Protocol (..), @@ -42,7 +44,7 @@ import Simplex.Messaging.Protocol SenderId, ) import Simplex.Messaging.Transport (supportedParameters) -import Simplex.Messaging.Transport.Client (TransportClientConfig) +import Simplex.Messaging.Transport.Client (TransportClientConfig, TransportHost) import Simplex.Messaging.Transport.HTTP2 import Simplex.Messaging.Transport.HTTP2.Client import Simplex.Messaging.Util (bshow, liftEitherError, whenM) @@ -51,11 +53,12 @@ import UnliftIO.Directory data XFTPClient = XFTPClient { http2Client :: HTTP2Client, + transportSession :: TransportSession FileResponse, config :: XFTPClientConfig } data XFTPClientConfig = XFTPClientConfig - { networkConfig :: NetworkConfig, + { xftpNetworkConfig :: NetworkConfig, uploadTimeoutPerMb :: Int } @@ -77,26 +80,41 @@ type XFTPClientError = ProtocolClientError XFTPErrorType defaultXFTPClientConfig :: XFTPClientConfig defaultXFTPClientConfig = XFTPClientConfig - { networkConfig = defaultNetworkConfig, + { xftpNetworkConfig = defaultNetworkConfig, uploadTimeoutPerMb = 10000000 -- 10 seconds } -getXFTPClient :: TransportSession FileResponse -> XFTPClientConfig -> IO () -> IO (Either XFTPClientError XFTPClient) -getXFTPClient transportSession@(_, srv, _) config@XFTPClientConfig {networkConfig} disconnected = runExceptT $ do - let tcConfig = transportClientConfig networkConfig +getXFTPClient :: TransportSession FileResponse -> XFTPClientConfig -> (XFTPClient -> IO ()) -> IO (Either XFTPClientError XFTPClient) +getXFTPClient transportSession@(_, srv, _) config@XFTPClientConfig {xftpNetworkConfig} disconnected = runExceptT $ do + let tcConfig = transportClientConfig xftpNetworkConfig http2Config = xftpHTTP2Config tcConfig config username = proxyUsername transportSession ProtocolServer _ host port keyHash = srv - useHost <- liftEither $ chooseTransportHost networkConfig host + useHost <- liftEither $ chooseTransportHost xftpNetworkConfig host + clientVar <- newTVarIO Nothing let usePort = if null port then "443" else port - http2Client <- liftEitherError xftpClientError $ getVerifiedHTTP2Client (Just username) useHost usePort (Just keyHash) Nothing http2Config disconnected - pure XFTPClient {http2Client, config} + clientDisconnected = readTVarIO clientVar >>= mapM_ disconnected + http2Client <- liftEitherError xftpClientError $ getVerifiedHTTP2Client (Just username) useHost usePort (Just keyHash) Nothing http2Config clientDisconnected + let c = XFTPClient {http2Client, transportSession, config} + atomically $ writeTVar clientVar $ Just c + pure c closeXFTPClient :: XFTPClient -> IO () closeXFTPClient XFTPClient {http2Client} = closeHTTP2Client http2Client +xftpClientServer :: XFTPClient -> String +xftpClientServer = B.unpack . strEncode . snd3 . transportSession + where + snd3 (_, s, _) = s + +xftpTransportHost :: XFTPClient -> TransportHost +xftpTransportHost = (host :: HClient -> TransportHost) . client_ . http2Client + +xftpSessionTs :: XFTPClient -> UTCTime +xftpSessionTs = sessionTs . http2Client + xftpHTTP2Config :: TransportClientConfig -> XFTPClientConfig -> HTTP2ClientConfig -xftpHTTP2Config transportConfig XFTPClientConfig {networkConfig = NetworkConfig {tcpConnectTimeout}} = +xftpHTTP2Config transportConfig XFTPClientConfig {xftpNetworkConfig = NetworkConfig {tcpConnectTimeout}} = defaultHTTP2ClientConfig { bodyHeadSize = xftpBlockSize, suportedTLSParams = supportedParameters, diff --git a/src/Simplex/FileTransfer/Client/Agent.hs b/src/Simplex/FileTransfer/Client/Agent.hs index e30956390..56ec9f0f6 100644 --- a/src/Simplex/FileTransfer/Client/Agent.hs +++ b/src/Simplex/FileTransfer/Client/Agent.hs @@ -68,8 +68,8 @@ getXFTPServerClient XFTPClientAgent {xftpClients, config} srv = do first (XFTPClientAgentError srv) <$> getXFTPClient (1, srv, Nothing) (xftpConfig config) clientDisconnected - clientDisconnected :: IO () - clientDisconnected = do + clientDisconnected :: XFTPClient -> IO () + clientDisconnected _ = do atomically $ TM.delete srv xftpClients logInfo $ "disconnected from " <> showServer srv @@ -84,7 +84,7 @@ getXFTPServerClient XFTPClientAgent {xftpClients, config} srv = do waitForXFTPClient :: XFTPClientVar -> ME XFTPClient waitForXFTPClient clientVar = do - let XFTPClientConfig {networkConfig = NetworkConfig {tcpConnectTimeout}} = xftpConfig config + let XFTPClientConfig {xftpNetworkConfig = NetworkConfig {tcpConnectTimeout}} = xftpConfig config client_ <- tcpConnectTimeout `timeout` atomically (readTMVar clientVar) liftEither $ case client_ of Just (Right c) -> Right c @@ -121,7 +121,7 @@ closeXFTPServerClient XFTPClientAgent {xftpClients, config} srv = atomically (TM.lookupDelete srv xftpClients) >>= mapM_ closeClient where closeClient cVar = do - let NetworkConfig {tcpConnectTimeout} = networkConfig $ xftpConfig config + let NetworkConfig {tcpConnectTimeout} = xftpNetworkConfig $ xftpConfig config tcpConnectTimeout `timeout` atomically (readTMVar cVar) >>= \case Just (Right client) -> closeXFTPClient client `catchAll_` pure () _ -> pure () diff --git a/src/Simplex/FileTransfer/Client/Main.hs b/src/Simplex/FileTransfer/Client/Main.hs index 3806df586..e0f113ba5 100644 --- a/src/Simplex/FileTransfer/Client/Main.hs +++ b/src/Simplex/FileTransfer/Client/Main.hs @@ -43,6 +43,8 @@ import Simplex.FileTransfer.Client.Agent import Simplex.FileTransfer.Description import Simplex.FileTransfer.Protocol import Simplex.FileTransfer.Transport (XFTPRcvChunkSpec (..)) +import Simplex.FileTransfer.Types +import Simplex.FileTransfer.Util (uniqueCombine) import qualified Simplex.Messaging.Crypto as C import qualified Simplex.Messaging.Crypto.Lazy as LC import Simplex.Messaging.Encoding @@ -52,7 +54,7 @@ import Simplex.Messaging.Protocol (ProtoServerWithAuth (..), SenderId, SndPrivat import Simplex.Messaging.Server.CLI (getCliCommand') import Simplex.Messaging.Util (ifM, tshow, whenM) import System.Exit (exitFailure) -import System.FilePath (splitExtensions, splitFileName, ()) +import System.FilePath (splitFileName, ()) import System.IO.Temp (getCanonicalTemporaryDirectory) import System.Random (StdGen, newStdGen, randomR) import UnliftIO @@ -79,9 +81,6 @@ maxFileSizeStr = B.unpack . strEncode $ FileSize maxFileSize fileSizeLen :: Int64 fileSizeLen = 8 -authTagSize :: Int64 -authTagSize = fromIntegral C.authTagSize - newtype CLIError = CLIError String deriving (Eq, Show, Exception) @@ -257,19 +256,6 @@ runE a = Left (CLIError e) -> putStrLn e >> exitFailure _ -> pure () --- fileExtra is added to allow header extension in future versions -data FileHeader = FileHeader - { fileName :: String, - fileExtra :: Maybe String - } - deriving (Eq, Show) - -instance Encoding FileHeader where - smpEncode FileHeader {fileName, fileExtra} = smpEncode (fileName, fileExtra) - smpP = do - (fileName, fileExtra) <- smpP - pure FileHeader {fileName, fileExtra} - cliSendFile :: SendOptions -> ExceptT CLIError IO () cliSendFile SendOptions {filePath, outputDir, numRecipients, xftpServers, retryCount, tempPath, verbose} = do let (_, fileName) = splitFileName filePath @@ -578,15 +564,6 @@ prepareChunkSpecs filePath chunkSizes = reverse . snd $ foldl' addSpec (0, []) c getEncPath :: MonadIO m => Maybe FilePath -> String -> m FilePath getEncPath path name = (`uniqueCombine` (name <> ".encrypted")) =<< maybe (liftIO getCanonicalTemporaryDirectory) pure path -uniqueCombine :: MonadIO m => FilePath -> String -> m FilePath -uniqueCombine filePath fileName = tryCombine (0 :: Int) - where - tryCombine n = - let (name, ext) = splitExtensions fileName - suffix = if n == 0 then "" else "_" <> show n - f = filePath (name <> suffix <> ext) - in ifM (doesPathExist f) (tryCombine $ n + 1) (pure f) - withReconnect :: Show e => XFTPClientAgent -> XFTPServer -> Int -> (XFTPClient -> ExceptT e IO a) -> ExceptT CLIError IO a withReconnect a srv n run = withRetry n $ do c <- withRetry n $ getXFTPServerClient a srv diff --git a/src/Simplex/FileTransfer/Description.hs b/src/Simplex/FileTransfer/Description.hs index cee74ff9a..ff3d7e144 100644 --- a/src/Simplex/FileTransfer/Description.hs +++ b/src/Simplex/FileTransfer/Description.hs @@ -39,6 +39,8 @@ import Data.Aeson (FromJSON, ToJSON) import qualified Data.Aeson as J import Data.Attoparsec.ByteString.Char8 (Parser) import qualified Data.Attoparsec.ByteString.Char8 as A +import Database.SQLite.Simple.FromField (FromField (..)) +import Database.SQLite.Simple.ToField (ToField (..)) import Data.Bifunctor (first) import Data.ByteString.Char8 (ByteString) import qualified Data.ByteString.Char8 as B @@ -96,6 +98,10 @@ instance ToJSON FileDigest where toJSON = strToJSON toEncoding = strToJEncoding +instance FromField FileDigest where fromField f = FileDigest <$> fromField f + +instance ToField FileDigest where toField (FileDigest s) = toField s + data FileChunk = FileChunk { chunkNo :: Int, chunkSize :: FileSize Word32, @@ -125,6 +131,10 @@ instance ToJSON ChunkReplicaId where toJSON = strToJSON toEncoding = strToJEncoding +instance FromField ChunkReplicaId where fromField f = ChunkReplicaId <$> fromField f + +instance ToField ChunkReplicaId where toField (ChunkReplicaId s) = toField s + data YAMLFileDescription = YAMLFileDescription { party :: FileParty, size :: String, @@ -226,6 +236,10 @@ gb n = 1024 * mb n instance (Integral a, Show a) => IsString (FileSize a) where fromString = either error id . strDecode . B.pack +instance (FromField a) => FromField (FileSize a) where fromField f = FileSize <$> fromField f + +instance (ToField a) => ToField (FileSize a) where toField (FileSize s) = toField s + groupReplicasByServer :: FileSize Word32 -> [FileChunk] -> [[FileServerReplica]] groupReplicasByServer defChunkSize = groupBy ((==) `on` replicaServer) diff --git a/src/Simplex/FileTransfer/Protocol.hs b/src/Simplex/FileTransfer/Protocol.hs index 954b3e81b..8f793114f 100644 --- a/src/Simplex/FileTransfer/Protocol.hs +++ b/src/Simplex/FileTransfer/Protocol.hs @@ -13,6 +13,7 @@ module Simplex.FileTransfer.Protocol where +import Control.Applicative ((<|>)) import Data.Aeson (FromJSON, ToJSON) import qualified Data.Aeson as J import qualified Data.Attoparsec.ByteString.Char8 as A @@ -25,6 +26,7 @@ import Data.Maybe (isNothing) import Data.Type.Equality import Data.Word (Word32) import GHC.Generics (Generic) +import Generic.Random (genericArbitraryU) import qualified Simplex.Messaging.Crypto as C import Simplex.Messaging.Encoding import Simplex.Messaging.Encoding.String @@ -55,8 +57,9 @@ import Simplex.Messaging.Protocol _smpP, ) import Simplex.Messaging.Transport (SessionId, TransportError (..)) -import Simplex.Messaging.Util ((<$?>)) +import Simplex.Messaging.Util (bshow, (<$?>)) import Simplex.Messaging.Version +import Test.QuickCheck (Arbitrary (..)) currentXFTPVersion :: Version currentXFTPVersion = 1 @@ -354,6 +357,18 @@ data XFTPErrorType DUPLICATE_ -- not part of SMP protocol, used internally deriving (Eq, Generic, Read, Show) +instance ToJSON XFTPErrorType where + toJSON = J.genericToJSON $ sumTypeJSON id + toEncoding = J.genericToEncoding $ sumTypeJSON id + +instance StrEncoding XFTPErrorType where + strEncode = \case + CMD e -> "CMD " <> bshow e + e -> bshow e + strP = "CMD " *> (CMD <$> parseRead1) <|> parseRead1 + +instance Arbitrary XFTPErrorType where arbitrary = genericArbitraryU + instance Encoding XFTPErrorType where smpEncode = \case BLOCK -> "BLOCK" diff --git a/src/Simplex/FileTransfer/Types.hs b/src/Simplex/FileTransfer/Types.hs new file mode 100644 index 000000000..dc6bd8f84 --- /dev/null +++ b/src/Simplex/FileTransfer/Types.hs @@ -0,0 +1,102 @@ +{-# LANGUAGE DuplicateRecordFields #-} +{-# LANGUAGE LambdaCase #-} +{-# LANGUAGE NamedFieldPuns #-} +{-# LANGUAGE OverloadedStrings #-} + +module Simplex.FileTransfer.Types where + +import Data.Int (Int64) +import Data.Word (Word32) +import Database.SQLite.Simple.FromField (FromField (..)) +import Database.SQLite.Simple.ToField (ToField (..)) +import Simplex.FileTransfer.Description +import qualified Simplex.Messaging.Crypto as C +import Simplex.Messaging.Encoding +import Simplex.Messaging.Encoding.String +import Simplex.Messaging.Parsers (fromTextField_) +import Simplex.Messaging.Protocol + +authTagSize :: Int64 +authTagSize = fromIntegral C.authTagSize + +-- fileExtra is added to allow header extension in future versions +data FileHeader = FileHeader + { fileName :: String, + fileExtra :: Maybe String + } + deriving (Eq, Show) + +instance Encoding FileHeader where + smpEncode FileHeader {fileName, fileExtra} = smpEncode (fileName, fileExtra) + smpP = do + (fileName, fileExtra) <- smpP + pure FileHeader {fileName, fileExtra} + +type RcvFileId = Int64 + +data RcvFile = RcvFile + { userId :: Int64, + rcvFileId :: RcvFileId, + size :: FileSize Int64, + digest :: FileDigest, + key :: C.SbKey, + nonce :: C.CbNonce, + chunkSize :: FileSize Word32, + chunks :: [RcvFileChunk], + tmpPath :: FilePath, + saveDir :: FilePath, + savePath :: Maybe FilePath, + status :: RcvFileStatus, + status :: RcvFileStatus + } + deriving (Eq, Show) + +-- TODO add error status? +data RcvFileStatus + = RFSReceiving + | RFSReceived + | RFSDecrypting + | RFSComplete + deriving (Eq, Show) + +instance FromField RcvFileStatus where fromField = fromTextField_ textDecode + +instance ToField RcvFileStatus where toField = toField . textEncode + +instance TextEncoding RcvFileStatus where + textDecode = \case + "receiving" -> Just RFSReceiving + "received" -> Just RFSReceived + "decrypting" -> Just RFSDecrypting + "complete" -> Just RFSComplete + _ -> Nothing + textEncode = \case + RFSReceiving -> "receiving" + RFSReceived -> "received" + RFSDecrypting -> "decrypting" + RFSComplete -> "complete" + +data RcvFileChunk = RcvFileChunk + { userId :: Int64, + rcvFileId :: RcvFileId, + rcvChunkId :: Int64, + chunkNo :: Int, + chunkSize :: FileSize Word32, + digest :: FileDigest, + replicas :: [RcvFileChunkReplica], + fileTmpPath :: FilePath, + chunkTmpPath :: Maybe FilePath, + nextDelay :: Maybe Int + } + deriving (Eq, Show) + +data RcvFileChunkReplica = RcvFileChunkReplica + { rcvChunkReplicaId :: Int64, + server :: XFTPServer, + replicaId :: ChunkReplicaId, + replicaKey :: C.APrivateSignKey, + received :: Bool, + acknowledged :: Bool, + retries :: Int + } + deriving (Eq, Show) diff --git a/src/Simplex/FileTransfer/Util.hs b/src/Simplex/FileTransfer/Util.hs new file mode 100644 index 000000000..f8ac3d1f8 --- /dev/null +++ b/src/Simplex/FileTransfer/Util.hs @@ -0,0 +1,18 @@ +module Simplex.FileTransfer.Util + ( uniqueCombine, + ) +where + +import Simplex.Messaging.Util (ifM) +import System.FilePath (splitExtensions, ()) +import UnliftIO +import UnliftIO.Directory + +uniqueCombine :: MonadIO m => FilePath -> String -> m FilePath +uniqueCombine filePath fileName = tryCombine (0 :: Int) + where + tryCombine n = + let (name, ext) = splitExtensions fileName + suffix = if n == 0 then "" else "_" <> show n + f = filePath (name <> suffix <> ext) + in ifM (doesPathExist f) (tryCombine $ n + 1) (pure f) diff --git a/src/Simplex/Messaging/Agent.hs b/src/Simplex/Messaging/Agent.hs index 1a0de3be0..80749ac6d 100644 --- a/src/Simplex/Messaging/Agent.hs +++ b/src/Simplex/Messaging/Agent.hs @@ -80,6 +80,7 @@ module Simplex.Messaging.Agent getNtfToken, getNtfTokenData, toggleConnectionNtfs, + xftpReceiveFile, activateAgent, suspendAgent, execAgentStoreSQL, @@ -113,6 +114,10 @@ import qualified Data.Text as T import Data.Time.Clock import Data.Time.Clock.System (systemToUTCTime) import qualified Database.SQLite.Simple as DB +import Simplex.FileTransfer.Agent (receiveFile) +import Simplex.FileTransfer.Description (ValidFileDescription) +import Simplex.FileTransfer.Protocol (FileParty (..)) +import Simplex.FileTransfer.Types (RcvFileId) import Simplex.Messaging.Agent.Client import Simplex.Messaging.Agent.Env.SQLite import Simplex.Messaging.Agent.Lock (withLock) @@ -322,6 +327,10 @@ getNtfTokenData c = withAgentEnv c $ getNtfTokenData' c toggleConnectionNtfs :: AgentErrorMonad m => AgentClient -> ConnId -> Bool -> m () toggleConnectionNtfs c = withAgentEnv c .: toggleConnectionNtfs' c +-- | Receive XFTP file +xftpReceiveFile :: AgentErrorMonad m => AgentClient -> UserId -> ValidFileDescription 'FPRecipient -> FilePath -> m RcvFileId +xftpReceiveFile c = withAgentEnv c .:. receiveFile c + -- | Activate operations activateAgent :: AgentErrorMonad m => AgentClient -> m () activateAgent c = withAgentEnv c $ activateAgent' c diff --git a/src/Simplex/Messaging/Agent/Client.hs b/src/Simplex/Messaging/Agent/Client.hs index dbc22684d..95eb48d3a 100644 --- a/src/Simplex/Messaging/Agent/Client.hs +++ b/src/Simplex/Messaging/Agent/Client.hs @@ -15,6 +15,7 @@ {-# LANGUAGE ScopedTypeVariables #-} {-# LANGUAGE TupleSections #-} {-# LANGUAGE TypeApplications #-} +{-# LANGUAGE TypeFamilyDependencies #-} module Simplex.Messaging.Agent.Client ( AgentClient (..), @@ -48,6 +49,7 @@ module Simplex.Messaging.Agent.Client agentNtfCreateSubscription, agentNtfCheckSubscription, agentNtfDeleteSubscription, + agentXFTPDownloadChunk, agentCbEncrypt, agentCbDecrypt, cryptoError, @@ -108,10 +110,17 @@ import Data.Maybe (isJust, listToMaybe) import Data.Set (Set) import qualified Data.Set as S import Data.Text.Encoding +import Data.Time (UTCTime) import Data.Word (Word16) import qualified Database.SQLite.Simple as DB import GHC.Generics (Generic) import Network.Socket (HostName) +import Simplex.FileTransfer.Client (XFTPClient, XFTPClientConfig (..)) +import qualified Simplex.FileTransfer.Client as X +import Simplex.FileTransfer.Description (ChunkReplicaId (..)) +import Simplex.FileTransfer.Protocol (FileResponse, XFTPErrorType) +import Simplex.FileTransfer.Transport (XFTPRcvChunkSpec) +import Simplex.FileTransfer.Types (RcvFileChunkReplica (..)) import Simplex.Messaging.Agent.Env.SQLite import Simplex.Messaging.Agent.Lock import Simplex.Messaging.Agent.Protocol @@ -151,6 +160,7 @@ import Simplex.Messaging.Protocol RcvNtfPublicDhKey, SMPMsgMeta (..), SndPublicVerifyKey, + XFTPServerWithAuth, ) import qualified Simplex.Messaging.Protocol as SMP import Simplex.Messaging.TMap (TMap) @@ -163,16 +173,20 @@ import UnliftIO (mapConcurrently) import qualified UnliftIO.Exception as E import UnliftIO.STM -type ClientVar err msg = TMVar (Either AgentErrorType (ProtocolClient err msg)) +type ClientVar msg = TMVar (Either AgentErrorType (Client msg)) -type SMPClientVar = TMVar (Either AgentErrorType SMPClient) +type SMPClientVar = ClientVar SMP.BrokerMsg -type NtfClientVar = TMVar (Either AgentErrorType NtfClient) +type NtfClientVar = ClientVar NtfResponse + +type XFTPClientVar = TMVar (Either AgentErrorType XFTPClient) type SMPTransportSession = TransportSession SMP.BrokerMsg type NtfTransportSession = TransportSession NtfResponse +type XFTPTransportSession = TransportSession FileResponse + data AgentClient = AgentClient { active :: TVar Bool, rcvQ :: TBQueue (ATransmission 'Client), @@ -182,6 +196,8 @@ data AgentClient = AgentClient smpClients :: TMap SMPTransportSession SMPClientVar, ntfServers :: TVar [NtfServer], ntfClients :: TMap NtfTransportSession NtfClientVar, + xftpServers :: TMap UserId (NonEmpty XFTPServerWithAuth), + xftpClients :: TMap XFTPTransportSession XFTPClientVar, useNetworkConfig :: TVar NetworkConfig, subscrConns :: TVar (Set ConnId), activeSubs :: TRcvQueues, @@ -246,7 +262,7 @@ data AgentStatsKey = AgentStatsKey deriving (Eq, Ord, Show) newAgentClient :: InitialAgentServers -> Env -> STM AgentClient -newAgentClient InitialAgentServers {smp, ntf, netCfg} agentEnv = do +newAgentClient InitialAgentServers {smp, ntf, xftp, netCfg} agentEnv = do let qSize = tbqSize $ config agentEnv active <- newTVar True rcvQ <- newTBQueue qSize @@ -256,6 +272,8 @@ newAgentClient InitialAgentServers {smp, ntf, netCfg} agentEnv = do smpClients <- TM.empty ntfServers <- newTVar ntf ntfClients <- TM.empty + xftpServers <- newTVar xftp + xftpClients <- TM.empty useNetworkConfig <- newTVar netCfg subscrConns <- newTVar S.empty activeSubs <- RQ.empty @@ -290,6 +308,8 @@ newAgentClient InitialAgentServers {smp, ntf, netCfg} agentEnv = do smpClients, ntfServers, ntfClients, + xftpServers, + xftpClients, useNetworkConfig, subscrConns, activeSubs, @@ -320,17 +340,41 @@ newAgentClient InitialAgentServers {smp, ntf, netCfg} agentEnv = do agentClientStore :: AgentClient -> SQLiteStore agentClientStore AgentClient {agentEnv = Env {store}} = store -class ProtocolServerClient err msg | msg -> err where - getProtocolServerClient :: AgentMonad m => AgentClient -> TransportSession msg -> m (ProtocolClient err msg) +class (Encoding err, Show err) => ProtocolServerClient err msg | msg -> err where + type Client msg = c | c -> msg + getProtocolServerClient :: AgentMonad m => AgentClient -> TransportSession msg -> m (Client msg) clientProtocolError :: err -> AgentErrorType + closeProtocolServerClient :: Client msg -> IO () + clientServer :: Client msg -> String + clientTransportHost :: Client msg -> TransportHost + clientSessionTs :: Client msg -> UTCTime instance ProtocolServerClient ErrorType BrokerMsg where + type Client BrokerMsg = ProtocolClient ErrorType BrokerMsg getProtocolServerClient = getSMPServerClient clientProtocolError = SMP + closeProtocolServerClient = closeProtocolClient + clientServer = protocolClientServer + clientTransportHost = transportHost' + clientSessionTs = sessionTs instance ProtocolServerClient ErrorType NtfResponse where + type Client NtfResponse = ProtocolClient ErrorType NtfResponse getProtocolServerClient = getNtfServerClient clientProtocolError = NTF + closeProtocolServerClient = closeProtocolClient + clientServer = protocolClientServer + clientTransportHost = transportHost' + clientSessionTs = sessionTs + +instance ProtocolServerClient XFTPErrorType FileResponse where + type Client FileResponse = XFTPClient + getProtocolServerClient = getXFTPServerClient + clientProtocolError = XFTP + closeProtocolServerClient = X.closeXFTPClient + clientServer = X.xftpClientServer + clientTransportHost = X.xftpTransportHost + clientSessionTs = X.xftpSessionTs getSMPServerClient :: forall m. AgentMonad m => AgentClient -> SMPTransportSession -> m SMPClient getSMPServerClient c@AgentClient {active, smpClients, msgQ} tSess@(userId, srv, _) = do @@ -420,6 +464,27 @@ getNtfServerClient c@AgentClient {active, ntfClients} tSess@(userId, srv, _) = d atomically $ writeTBQueue (subQ c) ("", "", hostEvent DISCONNECT client) logInfo . decodeUtf8 $ "Agent disconnected from " <> showServer srv +getXFTPServerClient :: forall m. AgentMonad m => AgentClient -> XFTPTransportSession -> m XFTPClient +getXFTPServerClient c@AgentClient {active, xftpClients, useNetworkConfig} tSess@(userId, srv, _) = do + unlessM (readTVarIO active) . throwError $ INTERNAL "agent is stopped" + atomically (getClientVar tSess xftpClients) + >>= either + (newProtocolClient c tSess xftpClients connectClient $ \_ _ -> pure ()) + (waitForProtocolClient c tSess) + where + connectClient :: m XFTPClient + connectClient = do + cfg <- asks $ xftpCfg . config + xftpNetworkConfig <- readTVarIO useNetworkConfig + liftEitherError (protocolClientError XFTP $ B.unpack $ strEncode srv) (X.getXFTPClient tSess cfg {xftpNetworkConfig} clientDisconnected) + + clientDisconnected :: XFTPClient -> IO () + clientDisconnected client = do + atomically $ TM.delete tSess xftpClients + incClientStat c userId client "DISCONNECT" "" + atomically $ writeTBQueue (subQ c) ("", "", hostEvent DISCONNECT client) + logInfo . decodeUtf8 $ "Agent disconnected from " <> showServer srv + getClientVar :: forall a s. TransportSession s -> TMap (TransportSession s) (TMVar a) -> STM (Either (TMVar a) (TMVar a)) getClientVar tSess clients = maybe (Left <$> newClientVar) (pure . Right) =<< TM.lookup tSess clients where @@ -429,7 +494,7 @@ getClientVar tSess clients = maybe (Left <$> newClientVar) (pure . Right) =<< TM TM.insert tSess var clients pure var -waitForProtocolClient :: (AgentMonad m, ProtocolTypeI (ProtoType msg)) => AgentClient -> TransportSession msg -> ClientVar err msg -> m (ProtocolClient err msg) +waitForProtocolClient :: (AgentMonad m, ProtocolTypeI (ProtoType msg)) => AgentClient -> TransportSession msg -> ClientVar msg -> m (Client msg) waitForProtocolClient c (_, srv, _) clientVar = do NetworkConfig {tcpConnectTimeout} <- readTVarIO $ useNetworkConfig c client_ <- liftIO $ tcpConnectTimeout `timeout` atomically (readTMVar clientVar) @@ -440,17 +505,17 @@ waitForProtocolClient c (_, srv, _) clientVar = do newProtocolClient :: forall err msg m. - (AgentMonad m, ProtocolTypeI (ProtoType msg)) => + (AgentMonad m, ProtocolTypeI (ProtoType msg), ProtocolServerClient err msg) => AgentClient -> TransportSession msg -> - TMap (TransportSession msg) (ClientVar err msg) -> - m (ProtocolClient err msg) -> + TMap (TransportSession msg) (ClientVar msg) -> + m (Client msg) -> (AgentClient -> TransportSession msg -> m ()) -> - ClientVar err msg -> - m (ProtocolClient err msg) + ClientVar msg -> + m (Client msg) newProtocolClient c tSess@(userId, srv, entityId_) clients connectClient reconnectClient clientVar = tryConnectClient pure tryConnectAsync where - tryConnectClient :: (ProtocolClient err msg -> m a) -> m () -> m a + tryConnectClient :: (Client msg -> m a) -> m () -> m a tryConnectClient successAction retryAction = tryError connectClient >>= \r -> case r of Right client -> do @@ -475,8 +540,8 @@ newProtocolClient c tSess@(userId, srv, entityId_) clients connectClient reconne withRetryInterval ri $ \loop -> void $ tryConnectClient (const $ reconnectClient c tSess) loop atomically . removeAsyncAction aId $ asyncClients c -hostEvent :: forall err msg. ProtocolTypeI (ProtoType msg) => (AProtocolType -> TransportHost -> ACommand 'Agent) -> ProtocolClient err msg -> ACommand 'Agent -hostEvent event client = event (AProtocolType $ protocolTypeI @(ProtoType msg)) $ transportHost' client +hostEvent :: forall err msg. (ProtocolTypeI (ProtoType msg), ProtocolServerClient err msg) => (AProtocolType -> TransportHost -> ACommand 'Agent) -> Client msg -> ACommand 'Agent +hostEvent event = event (AProtocolType $ protocolTypeI @(ProtoType msg)) . clientTransportHost getClientConfig :: AgentMonad m => AgentClient -> (AgentConfig -> ProtocolClientConfig) -> m ProtocolClientConfig getClientConfig AgentClient {useNetworkConfig} cfgSel = do @@ -519,7 +584,7 @@ throwWhenNoDelivery c SndQueue {server, sndId} = where k = (server, sndId) -closeProtocolServerClients :: AgentClient -> (AgentClient -> TMap (TransportSession msg) (ClientVar err msg)) -> IO () +closeProtocolServerClients :: ProtocolServerClient err msg => AgentClient -> (AgentClient -> TMap (TransportSession msg) (ClientVar msg)) -> IO () closeProtocolServerClients c clientsSel = atomically (swapTVar cs M.empty) >>= mapM_ (forkIO . closeClient) where @@ -527,7 +592,7 @@ closeProtocolServerClients c clientsSel = closeClient cVar = do NetworkConfig {tcpConnectTimeout} <- readTVarIO $ useNetworkConfig c tcpConnectTimeout `timeout` atomically (readTMVar cVar) >>= \case - Just (Right client) -> closeProtocolClient client `catchAll_` pure () + Just (Right client) -> closeProtocolServerClient client `catchAll_` pure () _ -> pure () cancelActions :: (Foldable f, Monoid (f (Async ()))) => TVar (f (Async ())) -> IO () @@ -542,29 +607,29 @@ withLockMap_ locks key = withGetLock $ TM.lookup key locks >>= maybe newLock pur where newLock = createLock >>= \l -> TM.insert key l locks $> l -withClient_ :: forall a m err msg. (AgentMonad m, ProtocolServerClient err msg) => AgentClient -> TransportSession msg -> ByteString -> (ProtocolClient err msg -> m a) -> m a +withClient_ :: forall a m err msg. (AgentMonad m, ProtocolServerClient err msg) => AgentClient -> TransportSession msg -> ByteString -> (Client msg -> m a) -> m a withClient_ c tSess@(userId, srv, _) statCmd action = do cl <- getProtocolServerClient c tSess (action cl <* stat cl "OK") `catchError` logServerError cl where stat cl = liftIO . incClientStat c userId cl statCmd - logServerError :: ProtocolClient err msg -> AgentErrorType -> m a + logServerError :: Client msg -> AgentErrorType -> m a logServerError cl e = do logServer "<--" c srv "" $ strEncode e stat cl $ strEncode e throwError e -withLogClient_ :: (AgentMonad m, ProtocolServerClient err msg) => AgentClient -> TransportSession msg -> EntityId -> ByteString -> (ProtocolClient err msg -> m a) -> m a +withLogClient_ :: (AgentMonad m, ProtocolServerClient err msg) => AgentClient -> TransportSession msg -> EntityId -> ByteString -> (Client msg -> m a) -> m a withLogClient_ c tSess@(_, srv, _) entId cmdStr action = do logServer "-->" c srv entId cmdStr res <- withClient_ c tSess cmdStr action logServer "<--" c srv entId "OK" return res -withClient :: forall m err msg a. (AgentMonad m, ProtocolServerClient err msg, ProtocolTypeI (ProtoType msg), Encoding err, Show err) => AgentClient -> TransportSession msg -> ByteString -> (ProtocolClient err msg -> ExceptT (ProtocolClientError err) IO a) -> m a +withClient :: forall m err msg a. (AgentMonad m, ProtocolServerClient err msg) => AgentClient -> TransportSession msg -> ByteString -> (Client msg -> ExceptT (ProtocolClientError err) IO a) -> m a withClient c tSess statKey action = withClient_ c tSess statKey $ \client -> liftClient (clientProtocolError @err @msg) (clientServer client) $ action client -withLogClient :: forall m err msg a. (AgentMonad m, ProtocolServerClient err msg, ProtocolTypeI (ProtoType msg), Encoding err, Show err) => AgentClient -> TransportSession msg -> EntityId -> ByteString -> (ProtocolClient err msg -> ExceptT (ProtocolClientError err) IO a) -> m a +withLogClient :: forall m err msg a. (AgentMonad m, ProtocolServerClient err msg) => AgentClient -> TransportSession msg -> EntityId -> ByteString -> (Client msg -> ExceptT (ProtocolClientError err) IO a) -> m a withLogClient c tSess entId cmdStr action = withLogClient_ c tSess entId cmdStr $ \client -> liftClient (clientProtocolError @err @msg) (clientServer client) $ action client withSMPClient :: (AgentMonad m, SMPQueueRec q) => AgentClient -> q -> ByteString -> (SMPClient -> ExceptT SMPClientError IO a) -> m a @@ -580,6 +645,17 @@ withSMPClient_ c q cmdStr action = do withNtfClient :: forall m a. AgentMonad m => AgentClient -> NtfServer -> EntityId -> ByteString -> (NtfClient -> ExceptT NtfClientError IO a) -> m a withNtfClient c srv = withLogClient c (0, srv, Nothing) +withXFTPClient :: + (AgentMonad m, ProtocolServerClient err msg) => + AgentClient -> + (UserId, ProtoServer msg, EntityId) -> + ByteString -> + (Client msg -> ExceptT (ProtocolClientError err) IO b) -> + m b +withXFTPClient c (userId, srv, fId) cmdStr action = do + tSess <- mkTransportSession c userId srv fId + withLogClient c tSess (strEncode fId) cmdStr action + liftClient :: (AgentMonad m, Show err, Encoding err) => (err -> AgentErrorType) -> HostName -> ExceptT (ProtocolClientError err) IO a -> m a liftClient protocolError_ = liftError . protocolClientError protocolError_ @@ -912,6 +988,10 @@ agentNtfDeleteSubscription :: AgentMonad m => AgentClient -> NtfSubscriptionId - agentNtfDeleteSubscription c subId NtfToken {ntfServer, ntfPrivKey} = withNtfClient c ntfServer subId "SDEL" $ \ntf -> ntfDeleteSubscription ntf ntfPrivKey subId +agentXFTPDownloadChunk :: AgentMonad m => AgentClient -> UserId -> RcvFileChunkReplica -> XFTPRcvChunkSpec -> m () +agentXFTPDownloadChunk c userId RcvFileChunkReplica {server, replicaId = ChunkReplicaId fId, replicaKey} chunkSpec = + withXFTPClient c (userId, server, fId) "FGET" $ \xftp -> X.downloadXFTPChunk xftp replicaKey fId chunkSpec + agentCbEncrypt :: AgentMonad m => SndQueue -> Maybe C.PublicKeyX25519 -> ByteString -> m ByteString agentCbEncrypt SndQueue {e2eDhSecret, smpClientVersion} e2ePubKey msg = do cmNonce <- liftIO C.randomCbNonce @@ -1041,7 +1121,7 @@ incStat AgentClient {agentStats} n k = do Just v -> modifyTVar' v (+ n) _ -> newTVar n >>= \v -> TM.insert k v agentStats -incClientStat :: AgentClient -> UserId -> ProtocolClient err msg -> ByteString -> ByteString -> IO () +incClientStat :: ProtocolServerClient err msg => AgentClient -> UserId -> Client msg -> ByteString -> ByteString -> IO () incClientStat c userId pc = incClientStatN c userId pc 1 incServerStat :: AgentClient -> UserId -> ProtocolServer p -> ByteString -> ByteString -> IO () @@ -1051,8 +1131,8 @@ incServerStat c userId ProtocolServer {host} cmd res = do where statsKey = AgentStatsKey {userId, host = strEncode $ L.head host, clientTs = "", cmd, res} -incClientStatN :: AgentClient -> UserId -> ProtocolClient err msg -> Int -> ByteString -> ByteString -> IO () +incClientStatN :: ProtocolServerClient err msg => AgentClient -> UserId -> Client msg -> Int -> ByteString -> ByteString -> IO () incClientStatN c userId pc n cmd res = do atomically $ incStat c n statsKey where - statsKey = AgentStatsKey {userId, host = strEncode $ transportHost' pc, clientTs = strEncode $ sessionTs pc, cmd, res} + statsKey = AgentStatsKey {userId, host = strEncode $ clientTransportHost pc, clientTs = strEncode $ clientSessionTs pc, cmd, res} diff --git a/src/Simplex/Messaging/Agent/Env/SQLite.hs b/src/Simplex/Messaging/Agent/Env/SQLite.hs index 0bba24180..de256d317 100644 --- a/src/Simplex/Messaging/Agent/Env/SQLite.hs +++ b/src/Simplex/Messaging/Agent/Env/SQLite.hs @@ -24,6 +24,7 @@ module Simplex.Messaging.Agent.Env.SQLite createAgentStore, NtfSupervisor (..), NtfSupervisorCommand (..), + XFTPAgent (..), ) where @@ -37,6 +38,7 @@ import Data.Time.Clock (NominalDiffTime, nominalDay) import Data.Word (Word16) import Network.Socket import Numeric.Natural +import Simplex.FileTransfer.Client (XFTPClientConfig (..), defaultXFTPClientConfig) import Simplex.Messaging.Agent.Protocol import Simplex.Messaging.Agent.RetryInterval import Simplex.Messaging.Agent.Store (UserId) @@ -47,7 +49,7 @@ import Simplex.Messaging.Client.Agent () import qualified Simplex.Messaging.Crypto as C import Simplex.Messaging.Crypto.Ratchet (supportedE2EEncryptVRange) import Simplex.Messaging.Notifications.Types -import Simplex.Messaging.Protocol (NtfServer, supportedSMPClientVRange) +import Simplex.Messaging.Protocol (NtfServer, XFTPServer, XFTPServerWithAuth, supportedSMPClientVRange) import Simplex.Messaging.TMap (TMap) import qualified Simplex.Messaging.TMap as TM import Simplex.Messaging.Transport (TLS, Transport (..)) @@ -63,6 +65,7 @@ type AgentMonad m = (MonadUnliftIO m, MonadReader Env m, MonadError AgentErrorTy data InitialAgentServers = InitialAgentServers { smp :: Map UserId (NonEmpty SMPServerWithAuth), ntf :: [NtfServer], + xftp :: Map UserId (NonEmpty XFTPServerWithAuth), netCfg :: NetworkConfig } @@ -84,6 +87,7 @@ data AgentConfig = AgentConfig yesToMigrations :: Bool, smpCfg :: ProtocolClientConfig, ntfCfg :: ProtocolClientConfig, + xftpCfg :: XFTPClientConfig, reconnectInterval :: RetryInterval, messageRetryInterval :: RetryInterval2, messageTimeout :: NominalDiffTime, @@ -144,6 +148,7 @@ defaultAgentConfig = yesToMigrations = False, smpCfg = defaultClientConfig {defaultTransport = (show defaultSMPPort, transport @TLS)}, ntfCfg = defaultClientConfig {defaultTransport = ("443", transport @TLS)}, + xftpCfg = defaultXFTPClientConfig, reconnectInterval = defaultReconnectInterval, messageRetryInterval = defaultMessageRetryInterval, messageTimeout = 2 * nominalDay, @@ -173,7 +178,8 @@ data Env = Env idsDrg :: TVar ChaChaDRG, clientCounter :: TVar Int, randomServer :: TVar StdGen, - ntfSupervisor :: NtfSupervisor + ntfSupervisor :: NtfSupervisor, + xftpAgent :: XFTPAgent } newSMPAgentEnv :: (MonadUnliftIO m, MonadRandom m) => AgentConfig -> m Env @@ -185,7 +191,8 @@ newSMPAgentEnv config@AgentConfig {database, yesToMigrations, initialClientId} = clientCounter <- newTVarIO initialClientId randomServer <- newTVarIO =<< liftIO newStdGen ntfSupervisor <- atomically . newNtfSubSupervisor $ tbqSize config - return Env {config, store, idsDrg, clientCounter, randomServer, ntfSupervisor} + xftpAgent <- atomically newXFTPAgent + return Env {config, store, idsDrg, clientCounter, randomServer, ntfSupervisor, xftpAgent} createAgentStore :: FilePath -> String -> Bool -> IO SQLiteStore createAgentStore dbFilePath dbKey = createSQLiteStore dbFilePath dbKey Migrations.app @@ -207,3 +214,12 @@ newNtfSubSupervisor qSize = do ntfWorkers <- TM.empty ntfSMPWorkers <- TM.empty pure NtfSupervisor {ntfTkn, ntfSubQ, ntfWorkers, ntfSMPWorkers} + +data XFTPAgent = XFTPAgent + { xftpWorkers :: TMap (Maybe XFTPServer) (TMVar (), Async ()) + } + +newXFTPAgent :: STM XFTPAgent +newXFTPAgent = do + xftpWorkers <- TM.empty + pure XFTPAgent {xftpWorkers} diff --git a/src/Simplex/Messaging/Agent/Protocol.hs b/src/Simplex/Messaging/Agent/Protocol.hs index 1c55a5b42..f1e7cec1d 100644 --- a/src/Simplex/Messaging/Agent/Protocol.hs +++ b/src/Simplex/Messaging/Agent/Protocol.hs @@ -155,6 +155,8 @@ import Database.SQLite.Simple.FromField import Database.SQLite.Simple.ToField import GHC.Generics (Generic) import Generic.Random (genericArbitraryU) +import Simplex.FileTransfer.Protocol (XFTPErrorType) +import Simplex.FileTransfer.Types (RcvFileId) import Simplex.Messaging.Agent.QueryString import qualified Simplex.Messaging.Crypto as C import Simplex.Messaging.Crypto.Ratchet (E2ERatchetParams, E2ERatchetParamsUri) @@ -282,6 +284,7 @@ data ACommand (p :: AParty) where OK :: ACommand Agent ERR :: AgentErrorType -> ACommand Agent SUSPENDED :: ACommand Agent + FRCVD :: RcvFileId -> FilePath -> ACommand Agent deriving instance Eq (ACommand p) @@ -324,6 +327,7 @@ data ACommandTag (p :: AParty) where OK_ :: ACommandTag Agent ERR_ :: ACommandTag Agent SUSPENDED_ :: ACommandTag Agent + FRCVD_ :: ACommandTag Agent deriving instance Eq (ACommandTag p) @@ -365,6 +369,7 @@ aCommandTag = \case OK -> OK_ ERR _ -> ERR_ SUSPENDED -> SUSPENDED_ + FRCVD {} -> FRCVD_ data QueueDirection = QDRcv | QDSnd deriving (Eq, Show) @@ -1072,6 +1077,8 @@ data AgentErrorType SMP {smpErr :: ErrorType} | -- | NTF protocol errors forwarded to agent clients NTF {ntfErr :: ErrorType} + | -- | XFTP protocol errors forwarded to agent clients + XFTP {xftpErr :: XFTPErrorType} | -- | SMP server errors BROKER {brokerAddress :: String, brokerErr :: BrokerErrorType} | -- | errors of other agents @@ -1166,6 +1173,7 @@ instance StrEncoding AgentErrorType where <|> "CONN " *> (CONN <$> parseRead1) <|> "SMP " *> (SMP <$> strP) <|> "NTF " *> (NTF <$> strP) + <|> "XFTP " *> (XFTP <$> strP) <|> "BROKER " *> (BROKER <$> textP <* " RESPONSE " <*> (RESPONSE <$> textP)) <|> "BROKER " *> (BROKER <$> textP <* " TRANSPORT " <*> (TRANSPORT <$> transportErrorP)) <|> "BROKER " *> (BROKER <$> textP <* A.space <*> parseRead1) @@ -1179,6 +1187,7 @@ instance StrEncoding AgentErrorType where CONN e -> "CONN " <> bshow e SMP e -> "SMP " <> strEncode e NTF e -> "NTF " <> strEncode e + XFTP e -> "XFTP " <> strEncode e BROKER srv (RESPONSE e) -> "BROKER " <> text srv <> " RESPONSE " <> text e BROKER srv (TRANSPORT e) -> "BROKER " <> text srv <> " TRANSPORT " <> serializeTransportError e BROKER srv e -> "BROKER " <> text srv <> " " <> bshow e @@ -1282,6 +1291,7 @@ instance APartyI p => StrEncoding (ACommandTag p) where OK_ -> "OK" ERR_ -> "ERR" SUSPENDED_ -> "SUSPENDED" + FRCVD_ -> "FRCVD" strP = (\(ACmdTag _ t) -> checkParty t) <$?> strP checkParty :: forall t p p'. (APartyI p, APartyI p') => t p' -> Either String (t p) @@ -1332,6 +1342,7 @@ commandP binaryP = OK_ -> pure OK ERR_ -> s (ERR <$> strP) SUSPENDED_ -> pure SUSPENDED + FRCVD_ -> s (FRCVD <$> A.decimal <* A.space <*> strP) where s :: Parser a -> Parser a s p = A.space *> p @@ -1385,6 +1396,7 @@ serializeCommand = \case ERR e -> s (ERR_, e) OK -> s OK_ SUSPENDED -> s SUSPENDED_ + FRCVD fId fPath -> s (FRCVD_, Str $ bshow fId, fPath) where s :: StrEncoding a => a -> ByteString s = strEncode diff --git a/src/Simplex/Messaging/Agent/Store.hs b/src/Simplex/Messaging/Agent/Store.hs index adcf70224..42c69398a 100644 --- a/src/Simplex/Messaging/Agent/Store.hs +++ b/src/Simplex/Messaging/Agent/Store.hs @@ -535,4 +535,8 @@ data StoreError SEX3dhKeysNotFound | -- | Used to wrap agent errors inside store operations to avoid race conditions SEAgentError AgentErrorType + | -- | XFTP Server not found. + SEXFTPServerNotFound + | -- | XFTP File not found. + SEFileNotFound deriving (Eq, Show, Exception) diff --git a/src/Simplex/Messaging/Agent/Store/SQLite.hs b/src/Simplex/Messaging/Agent/Store/SQLite.hs index 85b5d3fc7..8eae34807 100644 --- a/src/Simplex/Messaging/Agent/Store/SQLite.hs +++ b/src/Simplex/Messaging/Agent/Store/SQLite.hs @@ -123,6 +123,16 @@ module Simplex.Messaging.Agent.Store.SQLite getActiveNtfToken, getNtfRcvQueue, setConnectionNtfs, + -- File transfer + createRcvFile, + getRcvFile, + updateRcvFileChunkReceived, + updateRcvFileStatus, + updateRcvFileComplete, + updateRcvFileChunkReplicaRetries, + getNextRcvChunkToDownload, + getNextRcvFileToDecrypt, + getUnreceivedRcvFiles, -- * utilities withConnection, @@ -155,6 +165,7 @@ import Data.Text (Text) import qualified Data.Text as T import Data.Text.Encoding (decodeLatin1, encodeUtf8) import Data.Time.Clock (UTCTime, getCurrentTime) +import Data.Word (Word32) import Database.SQLite.Simple (FromRow, NamedParam (..), Only (..), Query (..), SQLError, ToRow, field, (:.) (..)) import qualified Database.SQLite.Simple as DB import Database.SQLite.Simple.FromField @@ -162,6 +173,9 @@ import Database.SQLite.Simple.QQ (sql) import Database.SQLite.Simple.ToField (ToField (..)) import qualified Database.SQLite3 as SQLite3 import Network.Socket (ServiceName) +import Simplex.FileTransfer.Description +import Simplex.FileTransfer.Protocol (FileParty (..)) +import Simplex.FileTransfer.Types import Simplex.Messaging.Agent.Protocol import Simplex.Messaging.Agent.Store import Simplex.Messaging.Agent.Store.SQLite.Migrations (Migration) @@ -173,7 +187,7 @@ import Simplex.Messaging.Encoding.String import Simplex.Messaging.Notifications.Protocol (DeviceToken (..), NtfSubscriptionId, NtfTknStatus (..), NtfTokenId, SMPQueueNtf (..)) import Simplex.Messaging.Notifications.Types import Simplex.Messaging.Parsers (blobFieldParser, fromTextField_) -import Simplex.Messaging.Protocol (MsgBody, MsgFlags, NtfServer, ProtocolServer (..), RcvNtfDhSecret, SndPublicVerifyKey, pattern NtfServer) +import Simplex.Messaging.Protocol import qualified Simplex.Messaging.Protocol as SMP import Simplex.Messaging.Transport.Client (TransportHost) import Simplex.Messaging.Util (bshow, eitherToMaybe, ($>>=), (<$$>)) @@ -1703,3 +1717,178 @@ randomId gVar n = U.encode <$> (atomically . stateTVar gVar $ randomBytesGenerat ntfSubAndSMPAction :: NtfSubAction -> (Maybe NtfSubNTFAction, Maybe NtfSubSMPAction) ntfSubAndSMPAction (NtfSubNTFAction action) = (Just action, Nothing) ntfSubAndSMPAction (NtfSubSMPAction action) = (Nothing, Just action) + +createXFTPServer_ :: DB.Connection -> XFTPServer -> IO Int64 +createXFTPServer_ db newSrv@ProtocolServer {host, port, keyHash} = + getXFTPServerId_ db newSrv >>= \case + Right srvId -> pure srvId + Left _ -> insertNewServer_ + where + insertNewServer_ = do + DB.execute db "INSERT INTO xftp_servers (xftp_host, xftp_port, xftp_key_hash) VALUES (?,?,?)" (host, port, keyHash) + insertedRowId db + +getXFTPServerId_ :: DB.Connection -> XFTPServer -> IO (Either StoreError Int64) +getXFTPServerId_ db ProtocolServer {host, port, keyHash} = do + firstRow fromOnly SEXFTPServerNotFound $ + DB.query db "SELECT xftp_server_id FROM xftp_servers WHERE xftp_host = ? AND xftp_port = ? AND xftp_key_hash = ?" (host, port, keyHash) + +createRcvFile :: DB.Connection -> UserId -> FileDescription 'FPRecipient -> FilePath -> FilePath -> IO RcvFileId +createRcvFile db userId fd@FileDescription {chunks} saveDir tmpPath = do + rcvFileId <- insertRcvFile fd + forM_ chunks $ \fc@FileChunk {replicas} -> do + chunkId <- insertChunk fc rcvFileId + forM_ (zip [1 ..] replicas) $ \(rno, replica) -> insertReplica rno replica chunkId + pure rcvFileId + where + insertRcvFile FileDescription {size, digest, key, nonce, chunkSize} = do + DB.execute + db + "INSERT INTO rcv_files (user_id, size, digest, key, nonce, chunk_size, tmp_path, save_dir, status) VALUES (?,?,?,?,?,?,?,?,?)" + (userId, size, digest, key, nonce, chunkSize, tmpPath, saveDir, RFSReceiving) + insertedRowId db + insertChunk FileChunk {chunkNo, chunkSize, digest} rcvFileId = do + DB.execute + db + "INSERT INTO rcv_file_chunks (rcv_file_id, chunk_no, chunk_size, digest) VALUES (?,?,?,?)" + (rcvFileId, chunkNo, chunkSize, digest) + insertedRowId db + insertReplica :: Int -> FileChunkReplica -> Int64 -> IO () + insertReplica replicaNo FileChunkReplica {server, replicaId, replicaKey} chunkId = do + srvId <- createXFTPServer_ db server + DB.execute + db + "INSERT INTO rcv_file_chunk_replicas (replica_number, rcv_file_chunk_id, xftp_server_id, replica_id, replica_key) VALUES (?,?,?,?,?)" + (replicaNo, chunkId, srvId, replicaId, replicaKey) + +getRcvFile :: DB.Connection -> RcvFileId -> IO (Either StoreError RcvFile) +getRcvFile db rcvFileId = runExceptT $ do + fd@RcvFile {userId, tmpPath} <- ExceptT getFile + chunks <- liftIO $ getChunks userId tmpPath + pure (fd {chunks} :: RcvFile) + where + getFile :: IO (Either StoreError RcvFile) + getFile = do + firstRow toFile SEFileNotFound $ + DB.query + db + [sql| + SELECT user_id, size, digest, key, nonce, chunk_size, tmp_path, save_dir, save_path, status + FROM rcv_files + WHERE rcv_file_id = ? + |] + (Only rcvFileId) + where + toFile :: (UserId, FileSize Int64, FileDigest, C.SbKey, C.CbNonce, FileSize Word32, FilePath, FilePath, Maybe FilePath, RcvFileStatus) -> RcvFile + toFile (userId, size, digest, key, nonce, chunkSize, tmpPath, saveDir, savePath, status) = + RcvFile {userId, rcvFileId, size, digest, key, nonce, chunkSize, tmpPath, saveDir, savePath, status, chunks = []} + getChunks :: UserId -> FilePath -> IO [RcvFileChunk] + getChunks userId fileTmpPath = do + chunks <- + map toChunk + <$> DB.query + db + [sql| + SELECT rcv_file_chunk_id, chunk_no, chunk_size, digest, tmp_path, next_delay + FROM rcv_file_chunks + WHERE rcv_file_id = ? + |] + (Only rcvFileId) + forM chunks $ \chunk@RcvFileChunk {rcvChunkId} -> do + replicas' <- getChunkReplicas rcvChunkId + pure (chunk {replicas = replicas'} :: RcvFileChunk) + where + toChunk :: (Int64, Int, FileSize Word32, FileDigest, Maybe FilePath, Maybe Int) -> RcvFileChunk + toChunk (rcvChunkId, chunkNo, chunkSize, digest, chunkTmpPath, nextDelay) = + RcvFileChunk {userId, rcvFileId, rcvChunkId, chunkNo, chunkSize, digest, fileTmpPath, chunkTmpPath, nextDelay, replicas = []} + getChunkReplicas :: Int64 -> IO [RcvFileChunkReplica] + getChunkReplicas chunkId = do + map toReplica + <$> DB.query + db + [sql| + SELECT + r.rcv_file_chunk_replica_id, r.replica_id, r.replica_key, r.received, r.acknowledged, r.retries, + s.xftp_host, s.xftp_port, s.xftp_key_hash + FROM rcv_file_chunk_replicas r + JOIN xftp_servers s ON s.xftp_server_id = r.xftp_server_id + WHERE r.rcv_file_chunk_id = ? + |] + (Only chunkId) + where + toReplica :: (Int64, ChunkReplicaId, C.APrivateSignKey, Bool, Bool, Int, NonEmpty TransportHost, ServiceName, C.KeyHash) -> RcvFileChunkReplica + toReplica (rcvChunkReplicaId, replicaId, replicaKey, received, acknowledged, retries, host, port, keyHash) = + let server = XFTPServer host port keyHash + in RcvFileChunkReplica {rcvChunkReplicaId, server, replicaId, replicaKey, received, acknowledged, retries} + +updateRcvFileChunkReceived :: DB.Connection -> Int64 -> Int64 -> RcvFileId -> FilePath -> IO (Either StoreError RcvFile) +updateRcvFileChunkReceived db rId cId fId chunkTmpPath = do + updatedAt <- getCurrentTime + DB.execute db "UPDATE rcv_file_chunk_replicas SET received = 1, updated_at = ? WHERE rcv_file_chunk_replica_id = ?" (updatedAt, rId) + DB.execute db "UPDATE rcv_file_chunks SET tmp_path = ?, updated_at = ? WHERE rcv_file_chunk_id = ?" (chunkTmpPath, updatedAt, cId) + getRcvFile db fId + +updateRcvFileStatus :: DB.Connection -> RcvFileId -> RcvFileStatus -> IO () +updateRcvFileStatus db rcvFileId status = do + updatedAt <- getCurrentTime + DB.execute db "UPDATE rcv_files SET status = ?, updated_at = ? WHERE rcv_file_id = ?" (status, updatedAt, rcvFileId) + +updateRcvFileComplete :: DB.Connection -> RcvFileId -> FilePath -> IO () +updateRcvFileComplete db rcvFileId savePath = do + updatedAt <- getCurrentTime + DB.execute db "UPDATE rcv_files SET save_path = ?, status = ?, updated_at = ? WHERE rcv_file_id = ?" (savePath, RFSComplete, updatedAt, rcvFileId) + +updateRcvFileChunkReplicaRetries :: DB.Connection -> Int64 -> IO () +updateRcvFileChunkReplicaRetries _db _replicaId = do + -- update rcv_file_chunk_replicas + undefined + +getNextRcvChunkToDownload :: DB.Connection -> XFTPServer -> IO (Maybe RcvFileChunk) +getNextRcvChunkToDownload db server@ProtocolServer {host, port, keyHash} = do + maybeFirstRow toChunk $ + DB.query + db + [sql| + SELECT + f.user_id, f.rcv_file_id, c.rcv_file_chunk_id, c.chunk_no, c.chunk_size, c.digest, f.tmp_path, c.tmp_path, c.next_delay, + r.rcv_file_chunk_replica_id, r.replica_id, r.replica_key, r.received, r.acknowledged, r.retries + FROM rcv_file_chunk_replicas r + JOIN xftp_servers s ON s.xftp_server_id = r.xftp_server_id + JOIN rcv_file_chunks c ON c.rcv_file_chunk_id = r.rcv_file_chunk_id + JOIN rcv_files f ON f.rcv_file_id = c.rcv_file_id + WHERE s.xftp_host = ? AND s.xftp_port = ? AND s.xftp_key_hash = ? + AND r.received = 0 AND r.replica_number = 1 + ORDER BY r.created_at ASC + LIMIT 1 + |] + (host, port, keyHash) + where + toChunk :: ((UserId, RcvFileId, Int64, Int, FileSize Word32, FileDigest, FilePath, Maybe FilePath, Maybe Int) :. (Int64, ChunkReplicaId, C.APrivateSignKey, Bool, Bool, Int)) -> RcvFileChunk + toChunk ((userId, rcvFileId, rcvChunkId, chunkNo, chunkSize, digest, fileTmpPath, chunkTmpPath, nextDelay) :. (rcvChunkReplicaId, replicaId, replicaKey, received, acknowledged, retries)) = + RcvFileChunk + { userId, + rcvFileId, + rcvChunkId, + chunkNo, + chunkSize, + digest, + fileTmpPath, + chunkTmpPath, + nextDelay, + replicas = [RcvFileChunkReplica {rcvChunkReplicaId, server, replicaId, replicaKey, received, acknowledged, retries}] + } + +getNextRcvFileToDecrypt :: DB.Connection -> IO (Maybe RcvFile) +getNextRcvFileToDecrypt db = do + fileId_ :: Maybe RcvFileId <- + maybeFirstRow fromOnly $ + DB.query db "SELECT rcv_file_id FROM rcv_files WHERE status = ? ORDER BY created_at ASC LIMIT 1" (Only RFSReceived) + case fileId_ of + Nothing -> pure Nothing + Just fileId -> eitherToMaybe <$> getRcvFile db fileId + +getUnreceivedRcvFiles :: DB.Connection -> IO [RcvFile] +getUnreceivedRcvFiles _db = do + -- get unique file ids from rcv_files where status /= complete + -- getRcvFile for each file id + undefined diff --git a/src/Simplex/Messaging/Agent/Store/SQLite/Migrations.hs b/src/Simplex/Messaging/Agent/Store/SQLite/Migrations.hs index d896da044..7cdc153a4 100644 --- a/src/Simplex/Messaging/Agent/Store/SQLite/Migrations.hs +++ b/src/Simplex/Messaging/Agent/Store/SQLite/Migrations.hs @@ -41,6 +41,7 @@ import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230110_users import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230117_fkey_indexes import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230120_delete_errors import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230217_server_key_hash +import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230223_files import Simplex.Messaging.Encoding.String import Simplex.Messaging.Transport.Client (TransportHost) @@ -61,7 +62,8 @@ schemaMigrations = ("m20230110_users", m20230110_users), ("m20230117_fkey_indexes", m20230117_fkey_indexes), ("m20230120_delete_errors", m20230120_delete_errors), - ("m20230217_server_key_hash", m20230217_server_key_hash) + ("m20230217_server_key_hash", m20230217_server_key_hash), + ("m20230223_files", m20230223_files) ] -- | The list of migrations in ascending order by date diff --git a/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/M20230223_files.hs b/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/M20230223_files.hs new file mode 100644 index 000000000..11cd45801 --- /dev/null +++ b/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/M20230223_files.hs @@ -0,0 +1,69 @@ +{-# LANGUAGE QuasiQuotes #-} + +module Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230223_files where + +import Database.SQLite.Simple (Query) +import Database.SQLite.Simple.QQ (sql) + +m20230223_files :: Query +m20230223_files = + [sql| +CREATE TABLE xftp_servers ( + xftp_server_id INTEGER PRIMARY KEY, + xftp_host TEXT NOT NULL, + xftp_port TEXT NOT NULL, + xftp_key_hash BLOB NOT NULL, + created_at TEXT NOT NULL DEFAULT (datetime('now')), + updated_at TEXT NOT NULL DEFAULT (datetime('now')), + UNIQUE(xftp_host, xftp_port, xftp_key_hash) +); + +CREATE TABLE rcv_files ( + rcv_file_id INTEGER PRIMARY KEY AUTOINCREMENT, + user_id INTEGER NOT NULL REFERENCES users ON DELETE CASCADE, + size INTEGER NOT NULL, + digest BLOB NOT NULL, + key BLOB NOT NULL, + nonce BLOB NOT NULL, + chunk_size INTEGER NOT NULL, + tmp_path TEXT NOT NULL, + save_dir TEXT NOT NULL, + save_path TEXT, + status TEXT NOT NULL, + created_at TEXT NOT NULL DEFAULT (datetime('now')), + updated_at TEXT NOT NULL DEFAULT (datetime('now')) +); + +CREATE INDEX idx_rcv_files_user_id ON rcv_files(user_id); + +CREATE TABLE rcv_file_chunks ( + rcv_file_chunk_id INTEGER PRIMARY KEY, + rcv_file_id INTEGER NOT NULL REFERENCES rcv_files ON DELETE CASCADE, + chunk_no INTEGER NOT NULL, + chunk_size INTEGER NOT NULL, + digest BLOB NOT NULL, + tmp_path TEXT, + next_delay INTEGER, + created_at TEXT NOT NULL DEFAULT (datetime('now')), + updated_at TEXT NOT NULL DEFAULT (datetime('now')) +); + +CREATE INDEX idx_rcv_file_chunks_rcv_file_id ON rcv_file_chunks(rcv_file_id); + +CREATE TABLE rcv_file_chunk_replicas ( + rcv_file_chunk_replica_id INTEGER PRIMARY KEY, + rcv_file_chunk_id INTEGER NOT NULL REFERENCES rcv_file_chunks ON DELETE CASCADE, + replica_number INTEGER NOT NULL, + xftp_server_id INTEGER NOT NULL REFERENCES xftp_servers ON DELETE CASCADE, + replica_id BLOB NOT NULL, + replica_key BLOB NOT NULL, + received INTEGER NOT NULL DEFAULT 0, + acknowledged INTEGER NOT NULL DEFAULT 0, + retries INTEGER NOT NULL DEFAULT 0, + created_at TEXT NOT NULL DEFAULT (datetime('now')), + updated_at TEXT NOT NULL DEFAULT (datetime('now')) +); + +CREATE INDEX idx_rcv_file_chunk_replicas_rcv_file_chunk_id ON rcv_file_chunk_replicas(rcv_file_chunk_id); +CREATE INDEX idx_rcv_file_chunk_replicas_xftp_server_id ON rcv_file_chunk_replicas(xftp_server_id); +|] diff --git a/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/agent_schema.sql b/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/agent_schema.sql index 540329a57..13aa097a6 100644 --- a/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/agent_schema.sql +++ b/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/agent_schema.sql @@ -283,3 +283,59 @@ CREATE INDEX idx_snd_messages_conn_id_internal_id ON snd_messages( internal_id ); CREATE INDEX idx_snd_queues_host_port ON snd_queues(host, port); +CREATE TABLE xftp_servers( + xftp_server_id INTEGER PRIMARY KEY, + xftp_host TEXT NOT NULL, + xftp_port TEXT NOT NULL, + xftp_key_hash BLOB NOT NULL, + created_at TEXT NOT NULL DEFAULT(datetime('now')), + updated_at TEXT NOT NULL DEFAULT(datetime('now')), + UNIQUE(xftp_host, xftp_port, xftp_key_hash) +); +CREATE TABLE rcv_files( + rcv_file_id INTEGER PRIMARY KEY AUTOINCREMENT, + user_id INTEGER NOT NULL REFERENCES users ON DELETE CASCADE, + size INTEGER NOT NULL, + digest BLOB NOT NULL, + key BLOB NOT NULL, + nonce BLOB NOT NULL, + chunk_size INTEGER NOT NULL, + tmp_path TEXT NOT NULL, + save_dir TEXT NOT NULL, + save_path TEXT, + status TEXT NOT NULL, + created_at TEXT NOT NULL DEFAULT(datetime('now')), + updated_at TEXT NOT NULL DEFAULT(datetime('now')) +); +CREATE INDEX idx_rcv_files_user_id ON rcv_files(user_id); +CREATE TABLE rcv_file_chunks( + rcv_file_chunk_id INTEGER PRIMARY KEY, + rcv_file_id INTEGER NOT NULL REFERENCES rcv_files ON DELETE CASCADE, + chunk_no INTEGER NOT NULL, + chunk_size INTEGER NOT NULL, + digest BLOB NOT NULL, + tmp_path TEXT, + next_delay INTEGER, + created_at TEXT NOT NULL DEFAULT(datetime('now')), + updated_at TEXT NOT NULL DEFAULT(datetime('now')) +); +CREATE INDEX idx_rcv_file_chunks_rcv_file_id ON rcv_file_chunks(rcv_file_id); +CREATE TABLE rcv_file_chunk_replicas( + rcv_file_chunk_replica_id INTEGER PRIMARY KEY, + rcv_file_chunk_id INTEGER NOT NULL REFERENCES rcv_file_chunks ON DELETE CASCADE, + replica_number INTEGER NOT NULL, + xftp_server_id INTEGER NOT NULL REFERENCES xftp_servers ON DELETE CASCADE, + replica_id BLOB NOT NULL, + replica_key BLOB NOT NULL, + received INTEGER NOT NULL DEFAULT 0, + acknowledged INTEGER NOT NULL DEFAULT 0, + retries INTEGER NOT NULL DEFAULT 0, + created_at TEXT NOT NULL DEFAULT(datetime('now')), + updated_at TEXT NOT NULL DEFAULT(datetime('now')) +); +CREATE INDEX idx_rcv_file_chunk_replicas_rcv_file_chunk_id ON rcv_file_chunk_replicas( + rcv_file_chunk_id +); +CREATE INDEX idx_rcv_file_chunk_replicas_xftp_server_id ON rcv_file_chunk_replicas( + xftp_server_id +); diff --git a/src/Simplex/Messaging/Client.hs b/src/Simplex/Messaging/Client.hs index b4d633860..7df7cf97d 100644 --- a/src/Simplex/Messaging/Client.hs +++ b/src/Simplex/Messaging/Client.hs @@ -31,7 +31,7 @@ module Simplex.Messaging.Client SMPClient, getProtocolClient, closeProtocolClient, - clientServer, + protocolClientServer, transportHost', transportSession', @@ -254,8 +254,8 @@ chooseTransportHost NetworkConfig {socksProxy, hostMode, requiredHostMode} hosts onionHost = find isOnionHost hosts publicHost = find (not . isOnionHost) hosts -clientServer :: ProtocolTypeI (ProtoType msg) => ProtocolClient err msg -> String -clientServer = B.unpack . strEncode . snd3 . transportSession . client_ +protocolClientServer :: ProtocolTypeI (ProtoType msg) => ProtocolClient err msg -> String +protocolClientServer = B.unpack . strEncode . snd3 . transportSession . client_ where snd3 (_, s, _) = s diff --git a/src/Simplex/Messaging/Crypto.hs b/src/Simplex/Messaging/Crypto.hs index 10ad2b047..dcc8d595a 100644 --- a/src/Simplex/Messaging/Crypto.hs +++ b/src/Simplex/Messaging/Crypto.hs @@ -1032,6 +1032,10 @@ instance ToJSON CbNonce where instance FromJSON CbNonce where parseJSON = strParseJSON "CbNonce" +instance FromField CbNonce where fromField f = CryptoBoxNonce <$> fromField f + +instance ToField CbNonce where toField (CryptoBoxNonce s) = toField s + cbNonce :: ByteString -> CbNonce cbNonce s | len == 24 = CryptoBoxNonce s @@ -1072,6 +1076,10 @@ instance ToJSON SbKey where instance FromJSON SbKey where parseJSON = strParseJSON "SbKey" +instance FromField SbKey where fromField f = SecretBoxKey <$> fromField f + +instance ToField SbKey where toField (SecretBoxKey s) = toField s + sbKey :: ByteString -> Either String SbKey sbKey s | B.length s == 32 = Right $ SecretBoxKey s diff --git a/tests/SMPAgentClient.hs b/tests/SMPAgentClient.hs index b21554891..2687594d7 100644 --- a/tests/SMPAgentClient.hs +++ b/tests/SMPAgentClient.hs @@ -32,6 +32,7 @@ import Simplex.Messaging.Agent.Server (runSMPAgentBlocking) import Simplex.Messaging.Agent.Store (UserId) import Simplex.Messaging.Client (ProtocolClientConfig (..), chooseTransportHost, defaultClientConfig, defaultNetworkConfig) import Simplex.Messaging.Parsers (parseAll) +import Simplex.Messaging.Protocol (ProtoServerWithAuth, XFTPServer) import Simplex.Messaging.Transport import Simplex.Messaging.Transport.Client import Test.Hspec @@ -173,11 +174,15 @@ testSMPServer = "smp://LcJUMfVhwD8yxjAiSaDzzGF3-kLG4Uh0Fl_ZIjrRwjI=@localhost:50 testSMPServer2 :: SMPServer testSMPServer2 = "smp://LcJUMfVhwD8yxjAiSaDzzGF3-kLG4Uh0Fl_ZIjrRwjI=@localhost:5002" +testXFTPServer :: XFTPServer +testXFTPServer = "xftp://LcJUMfVhwD8yxjAiSaDzzGF3-kLG4Uh0Fl_ZIjrRwjI=@localhost:7001" + initAgentServers :: InitialAgentServers initAgentServers = InitialAgentServers { smp = userServers [noAuthSrv testSMPServer], ntf = ["ntf://LcJUMfVhwD8yxjAiSaDzzGF3-kLG4Uh0Fl_ZIjrRwjI=@localhost:6001"], + xftp = userServers [noAuthSrv testXFTPServer], netCfg = defaultNetworkConfig {tcpTimeout = 500_000, tcpConnectTimeout = 500_000} } @@ -208,7 +213,7 @@ withSmpAgentThreadOn_ t (port', smpPort', db') afterProcess = (\started -> runSMPAgentBlocking t started cfg' initServers') afterProcess -userServers :: NonEmpty SMPServerWithAuth -> Map UserId (NonEmpty SMPServerWithAuth) +userServers :: NonEmpty (ProtoServerWithAuth p) -> Map UserId (NonEmpty (ProtoServerWithAuth p)) userServers srvs = M.fromList [(1, srvs)] withSmpAgentThreadOn :: (MonadUnliftIO m, MonadRandom m) => ATransport -> (ServiceName, ServiceName, AgentDatabase) -> (ThreadId -> m a) -> m a diff --git a/tests/Test.hs b/tests/Test.hs index 2643674f6..d76357be4 100644 --- a/tests/Test.hs +++ b/tests/Test.hs @@ -16,6 +16,7 @@ import Simplex.Messaging.Transport.WebSockets (WS) import System.Directory (createDirectoryIfMissing, removeDirectoryRecursive) import System.Environment (setEnv) import Test.Hspec +import XFTPAgent import XFTPCLI import XFTPServerTests (xftpServerTests) @@ -46,4 +47,5 @@ main = do describe "XFTP server" xftpServerTests describe "XFTP file description" fileDescriptionTests describe "XFTP CLI" xftpCLITests + describe "XFTP agent" xftpAgentTests describe "Server CLIs" cliTests diff --git a/tests/XFTPAgent.hs b/tests/XFTPAgent.hs new file mode 100644 index 000000000..2bca61a17 --- /dev/null +++ b/tests/XFTPAgent.hs @@ -0,0 +1,59 @@ +{-# LANGUAGE DataKinds #-} +{-# LANGUAGE GADTs #-} +{-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE ScopedTypeVariables #-} + +module XFTPAgent where + +import AgentTests.FunctionalAPITests (get, runRight_) +import Control.Monad.Except +import Data.Bifunctor (first) +import qualified Data.ByteString as LB +import SMPAgentClient (agentCfg, initAgentServers) +import Simplex.FileTransfer.Description +import Simplex.FileTransfer.Protocol (FileParty (..), checkParty) +import Simplex.Messaging.Agent (getSMPAgentClient, xftpReceiveFile) +import Simplex.Messaging.Agent.Protocol (ACommand (FRCVD), AgentErrorType (..)) +import Simplex.Messaging.Encoding.String (StrEncoding (..)) +import System.Directory (getFileSize) +import System.FilePath (()) +import Test.Hspec +import XFTPCLI +import XFTPClient + +xftpAgentTests :: Spec +xftpAgentTests = around_ testBracket . describe "Functional API" $ do + it "should receive file" testXFTPAgentReceive + +testXFTPAgentReceive :: IO () +testXFTPAgentReceive = withXFTPServer $ do + -- send file using CLI + let filePath = senderFiles "testfile" + xftpCLI ["rand", filePath, "17mb"] `shouldReturn` ["File created: " <> filePath] + file <- LB.readFile filePath + getFileSize filePath `shouldReturn` mb 17 + let fdRcv = filePath <> ".xftp" "rcv1.xftp" + fdSnd = filePath <> ".xftp" "snd.xftp.private" + progress : sendResult <- xftpCLI ["send", filePath, senderFiles, "-s", testXFTPServerStr, "--tmp=tests/tmp"] + progress `shouldSatisfy` uploadProgress + sendResult + `shouldBe` [ "Sender file description: " <> fdSnd, + "Pass file descriptions to the recipient(s):", + fdRcv + ] + -- receive file using agent + rcp <- getSMPAgentClient agentCfg initAgentServers + runRight_ $ do + fd :: ValidFileDescription 'FPRecipient <- getFileDescription fdRcv + fId <- xftpReceiveFile rcp 1 fd recipientFiles + ("", "", FRCVD fId' path) <- get rcp + liftIO $ do + fId' `shouldBe` fId + LB.readFile path `shouldReturn` file + where + getFileDescription :: FilePath -> ExceptT AgentErrorType IO (ValidFileDescription 'FPRecipient) + getFileDescription path = do + fd :: AFileDescription <- ExceptT $ first (INTERNAL . ("Failed to parse file description: " <>)) . strDecode <$> LB.readFile path + vfd <- liftEither . first INTERNAL $ validateFileDescription fd + case vfd of + AVFD fd' -> either (throwError . INTERNAL) pure $ checkParty fd' diff --git a/tests/XFTPCLI.hs b/tests/XFTPCLI.hs index 6cbbb9090..14c62aea5 100644 --- a/tests/XFTPCLI.hs +++ b/tests/XFTPCLI.hs @@ -33,16 +33,19 @@ senderFiles = "tests/tmp/xftp-sender-files" recipientFiles :: FilePath recipientFiles = "tests/tmp/xftp-recipient-files" +xftpCLI :: [String] -> IO [String] +xftpCLI params = lines <$> capture_ (withArgs params xftpClientCLI) + testXFTPCLISendReceive :: IO () testXFTPCLISendReceive = withXFTPServer $ do let filePath = senderFiles "testfile" - xftp ["rand", filePath, "17mb"] `shouldReturn` ["File created: " <> filePath] + xftpCLI ["rand", filePath, "17mb"] `shouldReturn` ["File created: " <> filePath] file <- LB.readFile filePath getFileSize filePath `shouldReturn` mb 17 let fdRcv1 = filePath <> ".xftp" "rcv1.xftp" fdRcv2 = filePath <> ".xftp" "rcv2.xftp" fdSnd = filePath <> ".xftp" "snd.xftp.private" - progress : sendResult <- xftp ["send", filePath, senderFiles, "-n", "2", "-s", testXFTPServerStr, "--tmp=tests/tmp"] + progress : sendResult <- xftpCLI ["send", filePath, senderFiles, "-n", "2", "-s", testXFTPServerStr, "--tmp=tests/tmp"] progress `shouldSatisfy` uploadProgress sendResult `shouldBe` [ "Sender file description: " <> fdSnd, @@ -55,15 +58,14 @@ testXFTPCLISendReceive = withXFTPServer $ do testInfoFile fdRcv2 "Recipient" testReceiveFile fdRcv2 "testfile_1" file testInfoFile fdSnd "Sender" - xftp ["recv", fdSnd, recipientFiles, "--tmp=tests/tmp"] + xftpCLI ["recv", fdSnd, recipientFiles, "--tmp=tests/tmp"] `shouldThrow` anyException where - xftp params = lines <$> capture_ (withArgs params xftpClientCLI) testInfoFile fd party = do - xftp ["info", fd] + xftpCLI ["info", fd] `shouldReturn` [party <> " file description", "File download size: 18mb", "File server(s):", testXFTPServerStr <> ": 18mb"] testReceiveFile fd fileName file = do - progress : recvResult <- xftp ["recv", fd, recipientFiles, "--tmp=tests/tmp", "-y"] + progress : recvResult <- xftpCLI ["recv", fd, recipientFiles, "--tmp=tests/tmp", "-y"] progress `shouldSatisfy` downloadProgress fileName recvResult `shouldBe` ["File description " <> fd <> " is deleted."] LB.readFile (recipientFiles fileName) `shouldReturn` file @@ -71,13 +73,13 @@ testXFTPCLISendReceive = withXFTPServer $ do testXFTPCLISendReceive2servers :: IO () testXFTPCLISendReceive2servers = withXFTPServer . withXFTPServer2 $ do let filePath = senderFiles "testfile" - xftp ["rand", filePath, "17mb"] `shouldReturn` ["File created: " <> filePath] + xftpCLI ["rand", filePath, "17mb"] `shouldReturn` ["File created: " <> filePath] file <- LB.readFile filePath getFileSize filePath `shouldReturn` mb 17 let fdRcv1 = filePath <> ".xftp" "rcv1.xftp" fdRcv2 = filePath <> ".xftp" "rcv2.xftp" fdSnd = filePath <> ".xftp" "snd.xftp.private" - progress : sendResult <- xftp ["send", filePath, senderFiles, "-n", "2", "-s", testXFTPServerStr <> ";" <> testXFTPServerStr2, "--tmp=tests/tmp"] + progress : sendResult <- xftpCLI ["send", filePath, senderFiles, "-n", "2", "-s", testXFTPServerStr <> ";" <> testXFTPServerStr2, "--tmp=tests/tmp"] progress `shouldSatisfy` uploadProgress sendResult `shouldBe` [ "Sender file description: " <> fdSnd, @@ -88,9 +90,8 @@ testXFTPCLISendReceive2servers = withXFTPServer . withXFTPServer2 $ do testReceiveFile fdRcv1 "testfile" file testReceiveFile fdRcv2 "testfile_1" file where - xftp params = lines <$> capture_ (withArgs params xftpClientCLI) testReceiveFile fd fileName file = do - partyStr : sizeStr : srvStr : srvs <- xftp ["info", fd] + partyStr : sizeStr : srvStr : srvs <- xftpCLI ["info", fd] partyStr `shouldContain` "Recipient file description" sizeStr `shouldBe` "File download size: 18mb" srvStr `shouldBe` "File server(s):" @@ -100,7 +101,7 @@ testXFTPCLISendReceive2servers = withXFTPServer . withXFTPServer2 $ do srv1 `shouldContain` testXFTPServerStr srv2 `shouldContain` testXFTPServerStr2 _ -> print srvs >> error "more than 2 servers returned" - progress : recvResult <- xftp ["recv", fd, recipientFiles, "--tmp=tests/tmp", "-y"] + progress : recvResult <- xftpCLI ["recv", fd, recipientFiles, "--tmp=tests/tmp", "-y"] progress `shouldSatisfy` downloadProgress fileName recvResult `shouldBe` ["File description " <> fd <> " is deleted."] LB.readFile (recipientFiles fileName) `shouldReturn` file @@ -108,13 +109,13 @@ testXFTPCLISendReceive2servers = withXFTPServer . withXFTPServer2 $ do testXFTPCLIDelete :: IO () testXFTPCLIDelete = withXFTPServer . withXFTPServer2 $ do let filePath = senderFiles "testfile" - xftp ["rand", filePath, "17mb"] `shouldReturn` ["File created: " <> filePath] + xftpCLI ["rand", filePath, "17mb"] `shouldReturn` ["File created: " <> filePath] file <- LB.readFile filePath getFileSize filePath `shouldReturn` mb 17 let fdRcv1 = filePath <> ".xftp" "rcv1.xftp" fdRcv2 = filePath <> ".xftp" "rcv2.xftp" fdSnd = filePath <> ".xftp" "snd.xftp.private" - progress : sendResult <- xftp ["send", filePath, senderFiles, "-n", "2", "-s", testXFTPServerStr <> ";" <> testXFTPServerStr2, "--tmp=tests/tmp"] + progress : sendResult <- xftpCLI ["send", filePath, senderFiles, "-n", "2", "-s", testXFTPServerStr <> ";" <> testXFTPServerStr2, "--tmp=tests/tmp"] progress `shouldSatisfy` uploadProgress sendResult `shouldBe` [ "Sender file description: " <> fdSnd, @@ -122,23 +123,21 @@ testXFTPCLIDelete = withXFTPServer . withXFTPServer2 $ do fdRcv1, fdRcv2 ] - xftp ["del", fdRcv1] + xftpCLI ["del", fdRcv1] `shouldThrow` anyException - progress1 : recvResult <- xftp ["recv", fdRcv1, recipientFiles, "--tmp=tests/tmp", "-y"] + progress1 : recvResult <- xftpCLI ["recv", fdRcv1, recipientFiles, "--tmp=tests/tmp", "-y"] progress1 `shouldSatisfy` downloadProgress "testfile" recvResult `shouldBe` ["File description " <> fdRcv1 <> " is deleted."] LB.readFile (recipientFiles "testfile") `shouldReturn` file fs1 <- listDirectory xftpServerFiles fs2 <- listDirectory xftpServerFiles2 length fs1 + length fs2 `shouldBe` 6 - xftp ["del", fdSnd, "-y"] + xftpCLI ["del", fdSnd, "-y"] `shouldReturn` ["File deleted! \r", "File description " <> fdSnd <> " is deleted."] listDirectory xftpServerFiles >>= (`shouldBe` []) listDirectory xftpServerFiles2 >>= (`shouldBe` []) - xftp ["recv", fdRcv2, recipientFiles, "--tmp=tests/tmp"] + xftpCLI ["recv", fdRcv2, recipientFiles, "--tmp=tests/tmp"] `shouldThrow` anyException - where - xftp params = lines <$> capture_ (withArgs params xftpClientCLI) testPrepareChunkSizes :: IO () testPrepareChunkSizes = do diff --git a/tests/XFTPClient.hs b/tests/XFTPClient.hs index 934ce3629..5bfcadf47 100644 --- a/tests/XFTPClient.hs +++ b/tests/XFTPClient.hs @@ -110,6 +110,6 @@ testXFTPClientConfig = defaultXFTPClientConfig testXFTPClient :: HasCallStack => (HasCallStack => XFTPClient -> IO a) -> IO a testXFTPClient client = - getXFTPClient (1, testXFTPServer, Nothing) testXFTPClientConfig (pure ()) >>= \case + getXFTPClient (1, testXFTPServer, Nothing) testXFTPClientConfig (\_ -> pure ()) >>= \case Right c -> client c Left e -> error $ show e