xftp: agent receive file (#637)

* xftp: agent receive file draft

* receiveFile more stubs, changes to types, schema

* cabal file

* comments

* xftp_server_id

* schema changes, get client, local worker/action

* agent env, save file description, adjust schema

* client stubs

* download chunk wip, store, schema, types

* remove commented code

* read file description, schema, types

* check received, decrypt

* remove pure

* todo

* add XFTP to agent client

* add user id

* agent test

* tests

* rename supervisor into agent

---------

Co-authored-by: Evgeny Poberezkin <2769109+epoberezkin@users.noreply.github.com>
This commit is contained in:
spaced4ndy
2023-03-03 18:50:16 +04:00
committed by GitHub
parent 6ed4dd1515
commit cf147397a4
26 changed files with 942 additions and 95 deletions
+1
View File
@@ -21,6 +21,7 @@ servers =
InitialAgentServers
{ smp = M.fromList [(1, L.fromList ["smp://bU0K-bRg24xWW__lS0umO1Zdw_SXqpJNtm1_RrPLViE=@localhost:5223"])],
ntf = [],
xftp = M.fromList [],
netCfg = defaultNetworkConfig
}
+10
View File
@@ -35,6 +35,7 @@ flag swift
library
exposed-modules:
Simplex.FileTransfer
Simplex.FileTransfer.Agent
Simplex.FileTransfer.Client
Simplex.FileTransfer.Client.Agent
Simplex.FileTransfer.Client.Main
@@ -47,6 +48,8 @@ library
Simplex.FileTransfer.Server.Store
Simplex.FileTransfer.Server.StoreLog
Simplex.FileTransfer.Transport
Simplex.FileTransfer.Types
Simplex.FileTransfer.Util
Simplex.Messaging.Agent
Simplex.Messaging.Agent.Client
Simplex.Messaging.Agent.Env.SQLite
@@ -72,6 +75,7 @@ library
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230117_fkey_indexes
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230120_delete_errors
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230217_server_key_hash
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230223_files
Simplex.Messaging.Agent.TAsyncs
Simplex.Messaging.Agent.TRcvQueues
Simplex.Messaging.Client
@@ -224,6 +228,7 @@ executable ntf-server
, sqlcipher-simple ==0.4.*
, stm ==2.5.*
, template-haskell ==2.16.*
, temporary ==1.3.*
, text ==1.2.*
, time ==1.9.*
, time-compat ==1.9.*
@@ -287,6 +292,7 @@ executable smp-agent
, sqlcipher-simple ==0.4.*
, stm ==2.5.*
, template-haskell ==2.16.*
, temporary ==1.3.*
, text ==1.2.*
, time ==1.9.*
, time-compat ==1.9.*
@@ -350,6 +356,7 @@ executable smp-server
, sqlcipher-simple ==0.4.*
, stm ==2.5.*
, template-haskell ==2.16.*
, temporary ==1.3.*
, text ==1.2.*
, time ==1.9.*
, time-compat ==1.9.*
@@ -477,6 +484,7 @@ executable xftp-server
, sqlcipher-simple ==0.4.*
, stm ==2.5.*
, template-haskell ==2.16.*
, temporary ==1.3.*
, text ==1.2.*
, time ==1.9.*
, time-compat ==1.9.*
@@ -517,6 +525,7 @@ test-suite simplexmq-test
ServerTests
SMPAgentClient
SMPClient
XFTPAgent
XFTPCLI
XFTPClient
XFTPServerTests
@@ -568,6 +577,7 @@ test-suite simplexmq-test
, sqlcipher-simple ==0.4.*
, stm ==2.5.*
, template-haskell ==2.16.*
, temporary ==1.3.*
, text ==1.2.*
, time ==1.9.*
, time-compat ==1.9.*
+182
View File
@@ -0,0 +1,182 @@
{-# LANGUAGE ConstraintKinds #-}
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE DuplicateRecordFields #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
module Simplex.FileTransfer.Agent
( receiveFile,
)
where
import Control.Monad
import Control.Monad.Except
import Control.Monad.Reader
import qualified Data.Attoparsec.ByteString.Char8 as A
import Data.Bifunctor (first)
import qualified Data.ByteString.Lazy.Char8 as LB
import Data.Int (Int64)
import Simplex.FileTransfer.Description
import Simplex.FileTransfer.Protocol (FileParty (..))
import Simplex.FileTransfer.Transport (XFTPRcvChunkSpec (..))
import Simplex.FileTransfer.Types
import Simplex.FileTransfer.Util (uniqueCombine)
import Simplex.Messaging.Agent.Client
import Simplex.Messaging.Agent.Env.SQLite
import Simplex.Messaging.Agent.Protocol (ACommand (FRCVD), AParty (..), AgentErrorType (INTERNAL))
import Simplex.Messaging.Agent.RetryInterval
import Simplex.Messaging.Agent.Store
import Simplex.Messaging.Agent.Store.SQLite
import qualified Simplex.Messaging.Crypto.Lazy as LC
import Simplex.Messaging.Encoding
import Simplex.Messaging.Protocol (XFTPServer)
import qualified Simplex.Messaging.TMap as TM
import Simplex.Messaging.Util (whenM)
import UnliftIO
import UnliftIO.Directory
import qualified UnliftIO.Exception as E
receiveFile :: AgentMonad m => AgentClient -> UserId -> ValidFileDescription 'FPRecipient -> FilePath -> m Int64
receiveFile c userId (ValidFileDescription fd@FileDescription {chunks}) xftpPath = do
encPath <- uniqueCombine xftpPath "xftp.encrypted"
createDirectory encPath
fId <- withStore' c $ \db -> createRcvFile db userId fd xftpPath encPath
forM_ chunks downloadChunk
pure fId
where
downloadChunk :: AgentMonad m => FileChunk -> m ()
downloadChunk FileChunk {replicas = (FileChunkReplica {server} : _)} = do
addWorker c (Just server)
downloadChunk _ = throwError $ INTERNAL "no replicas"
addWorker :: AgentMonad m => AgentClient -> Maybe XFTPServer -> m ()
addWorker c srv_ = do
ws <- asks $ xftpWorkers . xftpAgent
atomically (TM.lookup srv_ ws) >>= \case
Nothing -> do
doWork <- newTMVarIO ()
let runWorker = case srv_ of
Just srv -> runXFTPWorker c srv doWork
Nothing -> runXFTPLocalWorker c doWork
worker <- async $ runWorker `E.finally` atomically (TM.delete srv_ ws)
atomically $ TM.insert srv_ (doWork, worker) ws
Just (doWork, _) ->
void . atomically $ tryPutTMVar doWork ()
runXFTPWorker :: forall m. AgentMonad m => AgentClient -> XFTPServer -> TMVar () -> m ()
runXFTPWorker c srv doWork = do
forever $ do
void . atomically $ readTMVar doWork
agentOperationBracket c AORcvNetwork throwWhenInactive runXftpOperation
where
runXftpOperation :: m ()
runXftpOperation = do
nextChunk <- withStore' c (`getNextRcvChunkToDownload` srv)
case nextChunk of
Nothing -> noWorkToDo
Just fc@RcvFileChunk {nextDelay} -> do
ri <- asks $ reconnectInterval . config
let ri' = maybe ri (\d -> ri {initialInterval = d}) nextDelay
withRetryInterval ri' $ \loop ->
downloadFileChunk fc
`catchError` \e -> do
liftIO $ print e
-- TODO don't loop on permanent errors
-- TODO increase replica retries count
-- TODO update nextDelay (modify withRetryInterval to expose current delay)
loop
noWorkToDo = void . atomically $ tryTakeTMVar doWork
downloadFileChunk :: RcvFileChunk -> m ()
downloadFileChunk RcvFileChunk {userId, rcvFileId, rcvChunkId, chunkNo, chunkSize, digest, fileTmpPath, replicas = replica : _} = do
chunkPath <- uniqueCombine fileTmpPath $ show chunkNo
let chunkSpec = XFTPRcvChunkSpec chunkPath (unFileSize chunkSize) (unFileDigest digest)
agentXFTPDownloadChunk c userId replica chunkSpec
fileReceived <- withStore c $ \db -> runExceptT $ do
-- both actions can be done in a single store method
fd <- ExceptT $ updateRcvFileChunkReceived db (rcvChunkReplicaId replica) rcvChunkId rcvFileId chunkPath
let fileReceived = allChunksReceived fd
when fileReceived $
liftIO $ updateRcvFileStatus db rcvFileId RFSReceived
pure fileReceived
-- check if chunk is downloaded and not acknowledged via flag acknowledged?
-- or just catch and ignore error on acknowledgement? (and remove flag)
-- agentXFTPAckChunk c replicaKey (unChunkReplicaId replicaId) `catchError` \_ -> pure ()
when fileReceived $ addWorker c Nothing
where
allChunksReceived :: RcvFile -> Bool
allChunksReceived RcvFile {chunks} =
all (\RcvFileChunk {replicas} -> any received replicas) chunks
downloadFileChunk _ = throwError $ INTERNAL "no replica"
runXFTPLocalWorker :: forall m. AgentMonad m => AgentClient -> TMVar () -> m ()
runXFTPLocalWorker c@AgentClient {subQ} doWork = do
forever $ do
void . atomically $ readTMVar doWork
runXftpOperation
where
runXftpOperation :: m ()
runXftpOperation = do
nextFile <- withStore' c getNextRcvFileToDecrypt
case nextFile of
Nothing -> noWorkToDo
Just fd -> do
ri <- asks $ reconnectInterval . config
withRetryInterval ri $ \loop ->
decryptFile fd
`catchError` \e -> do
liftIO $ print e
-- TODO don't loop on permanent errors
-- TODO fixed number of retries instead of exponential backoff?
loop
noWorkToDo = void . atomically $ tryTakeTMVar doWork
decryptFile :: RcvFile -> m ()
decryptFile RcvFile {rcvFileId, key, nonce, tmpPath, saveDir, chunks} = do
-- TODO remove tmpPath if exists
withStore' c $ \db -> updateRcvFileStatus db rcvFileId RFSDecrypting
chunkPaths <- getChunkPaths chunks
encSize <- liftIO $ foldM (\s path -> (s +) . fromIntegral <$> getFileSize path) 0 chunkPaths
path <- decrypt encSize chunkPaths
whenM (doesPathExist tmpPath) $ removeDirectoryRecursive tmpPath
withStore' c $ \db -> updateRcvFileComplete db rcvFileId path
notify $ FRCVD rcvFileId path
where
notify :: ACommand 'Agent -> m ()
notify cmd = atomically $ writeTBQueue subQ ("", "", cmd)
getChunkPaths :: [RcvFileChunk] -> m [FilePath]
getChunkPaths [] = pure []
getChunkPaths (RcvFileChunk {chunkTmpPath = Just path} : cs) = do
ps <- getChunkPaths cs
pure $ path : ps
getChunkPaths (RcvFileChunk {chunkTmpPath = Nothing} : _cs) =
throwError $ INTERNAL "no chunk path"
decrypt :: Int64 -> [FilePath] -> m FilePath
decrypt encSize chunkPaths = do
lazyChunks <- readChunks chunkPaths
(authOk, f) <- liftEither . first cryptoError $ LC.sbDecryptTailTag key nonce (encSize - authTagSize) lazyChunks
let (fileHdr, f') = LB.splitAt 1024 f
-- withFile encPath ReadMode $ \r -> do
-- fileHdr <- liftIO $ B.hGet r 1024
case A.parse smpP $ LB.toStrict fileHdr of
-- TODO XFTP errors
A.Fail _ _ e -> throwError $ INTERNAL $ "Invalid file header: " <> e
A.Partial _ -> throwError $ INTERNAL "Invalid file header"
A.Done rest FileHeader {fileName} -> do
-- TODO touch file in agent bracket
path <- uniqueCombine saveDir fileName
liftIO $ LB.writeFile path $ LB.fromStrict rest <> f'
unless authOk $ do
removeFile path
throwError $ INTERNAL "Error decrypting file: incorrect auth tag"
pure path
readChunks :: [FilePath] -> m LB.ByteString
readChunks =
foldM
( \s path -> do
chunk <- liftIO $ LB.readFile path
pure $ s <> chunk
)
LB.empty
+28 -10
View File
@@ -16,6 +16,7 @@ import Data.ByteString.Char8 (ByteString)
import qualified Data.ByteString.Char8 as B
import Data.Int (Int64)
import Data.List.NonEmpty (NonEmpty (..))
import Data.Time (UTCTime)
import Data.Word (Word32)
import qualified Network.HTTP.Types as N
import qualified Network.HTTP2.Client as H
@@ -34,6 +35,7 @@ import Simplex.Messaging.Client
import Simplex.Messaging.Client.Agent ()
import qualified Simplex.Messaging.Crypto as C
import qualified Simplex.Messaging.Crypto.Lazy as LC
import Simplex.Messaging.Encoding.String
import Simplex.Messaging.Protocol
( BasicAuth,
Protocol (..),
@@ -42,7 +44,7 @@ import Simplex.Messaging.Protocol
SenderId,
)
import Simplex.Messaging.Transport (supportedParameters)
import Simplex.Messaging.Transport.Client (TransportClientConfig)
import Simplex.Messaging.Transport.Client (TransportClientConfig, TransportHost)
import Simplex.Messaging.Transport.HTTP2
import Simplex.Messaging.Transport.HTTP2.Client
import Simplex.Messaging.Util (bshow, liftEitherError, whenM)
@@ -51,11 +53,12 @@ import UnliftIO.Directory
data XFTPClient = XFTPClient
{ http2Client :: HTTP2Client,
transportSession :: TransportSession FileResponse,
config :: XFTPClientConfig
}
data XFTPClientConfig = XFTPClientConfig
{ networkConfig :: NetworkConfig,
{ xftpNetworkConfig :: NetworkConfig,
uploadTimeoutPerMb :: Int
}
@@ -77,26 +80,41 @@ type XFTPClientError = ProtocolClientError XFTPErrorType
defaultXFTPClientConfig :: XFTPClientConfig
defaultXFTPClientConfig =
XFTPClientConfig
{ networkConfig = defaultNetworkConfig,
{ xftpNetworkConfig = defaultNetworkConfig,
uploadTimeoutPerMb = 10000000 -- 10 seconds
}
getXFTPClient :: TransportSession FileResponse -> XFTPClientConfig -> IO () -> IO (Either XFTPClientError XFTPClient)
getXFTPClient transportSession@(_, srv, _) config@XFTPClientConfig {networkConfig} disconnected = runExceptT $ do
let tcConfig = transportClientConfig networkConfig
getXFTPClient :: TransportSession FileResponse -> XFTPClientConfig -> (XFTPClient -> IO ()) -> IO (Either XFTPClientError XFTPClient)
getXFTPClient transportSession@(_, srv, _) config@XFTPClientConfig {xftpNetworkConfig} disconnected = runExceptT $ do
let tcConfig = transportClientConfig xftpNetworkConfig
http2Config = xftpHTTP2Config tcConfig config
username = proxyUsername transportSession
ProtocolServer _ host port keyHash = srv
useHost <- liftEither $ chooseTransportHost networkConfig host
useHost <- liftEither $ chooseTransportHost xftpNetworkConfig host
clientVar <- newTVarIO Nothing
let usePort = if null port then "443" else port
http2Client <- liftEitherError xftpClientError $ getVerifiedHTTP2Client (Just username) useHost usePort (Just keyHash) Nothing http2Config disconnected
pure XFTPClient {http2Client, config}
clientDisconnected = readTVarIO clientVar >>= mapM_ disconnected
http2Client <- liftEitherError xftpClientError $ getVerifiedHTTP2Client (Just username) useHost usePort (Just keyHash) Nothing http2Config clientDisconnected
let c = XFTPClient {http2Client, transportSession, config}
atomically $ writeTVar clientVar $ Just c
pure c
closeXFTPClient :: XFTPClient -> IO ()
closeXFTPClient XFTPClient {http2Client} = closeHTTP2Client http2Client
xftpClientServer :: XFTPClient -> String
xftpClientServer = B.unpack . strEncode . snd3 . transportSession
where
snd3 (_, s, _) = s
xftpTransportHost :: XFTPClient -> TransportHost
xftpTransportHost = (host :: HClient -> TransportHost) . client_ . http2Client
xftpSessionTs :: XFTPClient -> UTCTime
xftpSessionTs = sessionTs . http2Client
xftpHTTP2Config :: TransportClientConfig -> XFTPClientConfig -> HTTP2ClientConfig
xftpHTTP2Config transportConfig XFTPClientConfig {networkConfig = NetworkConfig {tcpConnectTimeout}} =
xftpHTTP2Config transportConfig XFTPClientConfig {xftpNetworkConfig = NetworkConfig {tcpConnectTimeout}} =
defaultHTTP2ClientConfig
{ bodyHeadSize = xftpBlockSize,
suportedTLSParams = supportedParameters,
+4 -4
View File
@@ -68,8 +68,8 @@ getXFTPServerClient XFTPClientAgent {xftpClients, config} srv = do
first (XFTPClientAgentError srv)
<$> getXFTPClient (1, srv, Nothing) (xftpConfig config) clientDisconnected
clientDisconnected :: IO ()
clientDisconnected = do
clientDisconnected :: XFTPClient -> IO ()
clientDisconnected _ = do
atomically $ TM.delete srv xftpClients
logInfo $ "disconnected from " <> showServer srv
@@ -84,7 +84,7 @@ getXFTPServerClient XFTPClientAgent {xftpClients, config} srv = do
waitForXFTPClient :: XFTPClientVar -> ME XFTPClient
waitForXFTPClient clientVar = do
let XFTPClientConfig {networkConfig = NetworkConfig {tcpConnectTimeout}} = xftpConfig config
let XFTPClientConfig {xftpNetworkConfig = NetworkConfig {tcpConnectTimeout}} = xftpConfig config
client_ <- tcpConnectTimeout `timeout` atomically (readTMVar clientVar)
liftEither $ case client_ of
Just (Right c) -> Right c
@@ -121,7 +121,7 @@ closeXFTPServerClient XFTPClientAgent {xftpClients, config} srv =
atomically (TM.lookupDelete srv xftpClients) >>= mapM_ closeClient
where
closeClient cVar = do
let NetworkConfig {tcpConnectTimeout} = networkConfig $ xftpConfig config
let NetworkConfig {tcpConnectTimeout} = xftpNetworkConfig $ xftpConfig config
tcpConnectTimeout `timeout` atomically (readTMVar cVar) >>= \case
Just (Right client) -> closeXFTPClient client `catchAll_` pure ()
_ -> pure ()
+3 -26
View File
@@ -43,6 +43,8 @@ import Simplex.FileTransfer.Client.Agent
import Simplex.FileTransfer.Description
import Simplex.FileTransfer.Protocol
import Simplex.FileTransfer.Transport (XFTPRcvChunkSpec (..))
import Simplex.FileTransfer.Types
import Simplex.FileTransfer.Util (uniqueCombine)
import qualified Simplex.Messaging.Crypto as C
import qualified Simplex.Messaging.Crypto.Lazy as LC
import Simplex.Messaging.Encoding
@@ -52,7 +54,7 @@ import Simplex.Messaging.Protocol (ProtoServerWithAuth (..), SenderId, SndPrivat
import Simplex.Messaging.Server.CLI (getCliCommand')
import Simplex.Messaging.Util (ifM, tshow, whenM)
import System.Exit (exitFailure)
import System.FilePath (splitExtensions, splitFileName, (</>))
import System.FilePath (splitFileName, (</>))
import System.IO.Temp (getCanonicalTemporaryDirectory)
import System.Random (StdGen, newStdGen, randomR)
import UnliftIO
@@ -79,9 +81,6 @@ maxFileSizeStr = B.unpack . strEncode $ FileSize maxFileSize
fileSizeLen :: Int64
fileSizeLen = 8
authTagSize :: Int64
authTagSize = fromIntegral C.authTagSize
newtype CLIError = CLIError String
deriving (Eq, Show, Exception)
@@ -257,19 +256,6 @@ runE a =
Left (CLIError e) -> putStrLn e >> exitFailure
_ -> pure ()
-- fileExtra is added to allow header extension in future versions
data FileHeader = FileHeader
{ fileName :: String,
fileExtra :: Maybe String
}
deriving (Eq, Show)
instance Encoding FileHeader where
smpEncode FileHeader {fileName, fileExtra} = smpEncode (fileName, fileExtra)
smpP = do
(fileName, fileExtra) <- smpP
pure FileHeader {fileName, fileExtra}
cliSendFile :: SendOptions -> ExceptT CLIError IO ()
cliSendFile SendOptions {filePath, outputDir, numRecipients, xftpServers, retryCount, tempPath, verbose} = do
let (_, fileName) = splitFileName filePath
@@ -578,15 +564,6 @@ prepareChunkSpecs filePath chunkSizes = reverse . snd $ foldl' addSpec (0, []) c
getEncPath :: MonadIO m => Maybe FilePath -> String -> m FilePath
getEncPath path name = (`uniqueCombine` (name <> ".encrypted")) =<< maybe (liftIO getCanonicalTemporaryDirectory) pure path
uniqueCombine :: MonadIO m => FilePath -> String -> m FilePath
uniqueCombine filePath fileName = tryCombine (0 :: Int)
where
tryCombine n =
let (name, ext) = splitExtensions fileName
suffix = if n == 0 then "" else "_" <> show n
f = filePath </> (name <> suffix <> ext)
in ifM (doesPathExist f) (tryCombine $ n + 1) (pure f)
withReconnect :: Show e => XFTPClientAgent -> XFTPServer -> Int -> (XFTPClient -> ExceptT e IO a) -> ExceptT CLIError IO a
withReconnect a srv n run = withRetry n $ do
c <- withRetry n $ getXFTPServerClient a srv
+14
View File
@@ -39,6 +39,8 @@ import Data.Aeson (FromJSON, ToJSON)
import qualified Data.Aeson as J
import Data.Attoparsec.ByteString.Char8 (Parser)
import qualified Data.Attoparsec.ByteString.Char8 as A
import Database.SQLite.Simple.FromField (FromField (..))
import Database.SQLite.Simple.ToField (ToField (..))
import Data.Bifunctor (first)
import Data.ByteString.Char8 (ByteString)
import qualified Data.ByteString.Char8 as B
@@ -96,6 +98,10 @@ instance ToJSON FileDigest where
toJSON = strToJSON
toEncoding = strToJEncoding
instance FromField FileDigest where fromField f = FileDigest <$> fromField f
instance ToField FileDigest where toField (FileDigest s) = toField s
data FileChunk = FileChunk
{ chunkNo :: Int,
chunkSize :: FileSize Word32,
@@ -125,6 +131,10 @@ instance ToJSON ChunkReplicaId where
toJSON = strToJSON
toEncoding = strToJEncoding
instance FromField ChunkReplicaId where fromField f = ChunkReplicaId <$> fromField f
instance ToField ChunkReplicaId where toField (ChunkReplicaId s) = toField s
data YAMLFileDescription = YAMLFileDescription
{ party :: FileParty,
size :: String,
@@ -226,6 +236,10 @@ gb n = 1024 * mb n
instance (Integral a, Show a) => IsString (FileSize a) where
fromString = either error id . strDecode . B.pack
instance (FromField a) => FromField (FileSize a) where fromField f = FileSize <$> fromField f
instance (ToField a) => ToField (FileSize a) where toField (FileSize s) = toField s
groupReplicasByServer :: FileSize Word32 -> [FileChunk] -> [[FileServerReplica]]
groupReplicasByServer defChunkSize =
groupBy ((==) `on` replicaServer)
+16 -1
View File
@@ -13,6 +13,7 @@
module Simplex.FileTransfer.Protocol where
import Control.Applicative ((<|>))
import Data.Aeson (FromJSON, ToJSON)
import qualified Data.Aeson as J
import qualified Data.Attoparsec.ByteString.Char8 as A
@@ -25,6 +26,7 @@ import Data.Maybe (isNothing)
import Data.Type.Equality
import Data.Word (Word32)
import GHC.Generics (Generic)
import Generic.Random (genericArbitraryU)
import qualified Simplex.Messaging.Crypto as C
import Simplex.Messaging.Encoding
import Simplex.Messaging.Encoding.String
@@ -55,8 +57,9 @@ import Simplex.Messaging.Protocol
_smpP,
)
import Simplex.Messaging.Transport (SessionId, TransportError (..))
import Simplex.Messaging.Util ((<$?>))
import Simplex.Messaging.Util (bshow, (<$?>))
import Simplex.Messaging.Version
import Test.QuickCheck (Arbitrary (..))
currentXFTPVersion :: Version
currentXFTPVersion = 1
@@ -354,6 +357,18 @@ data XFTPErrorType
DUPLICATE_ -- not part of SMP protocol, used internally
deriving (Eq, Generic, Read, Show)
instance ToJSON XFTPErrorType where
toJSON = J.genericToJSON $ sumTypeJSON id
toEncoding = J.genericToEncoding $ sumTypeJSON id
instance StrEncoding XFTPErrorType where
strEncode = \case
CMD e -> "CMD " <> bshow e
e -> bshow e
strP = "CMD " *> (CMD <$> parseRead1) <|> parseRead1
instance Arbitrary XFTPErrorType where arbitrary = genericArbitraryU
instance Encoding XFTPErrorType where
smpEncode = \case
BLOCK -> "BLOCK"
+102
View File
@@ -0,0 +1,102 @@
{-# LANGUAGE DuplicateRecordFields #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE OverloadedStrings #-}
module Simplex.FileTransfer.Types where
import Data.Int (Int64)
import Data.Word (Word32)
import Database.SQLite.Simple.FromField (FromField (..))
import Database.SQLite.Simple.ToField (ToField (..))
import Simplex.FileTransfer.Description
import qualified Simplex.Messaging.Crypto as C
import Simplex.Messaging.Encoding
import Simplex.Messaging.Encoding.String
import Simplex.Messaging.Parsers (fromTextField_)
import Simplex.Messaging.Protocol
authTagSize :: Int64
authTagSize = fromIntegral C.authTagSize
-- fileExtra is added to allow header extension in future versions
data FileHeader = FileHeader
{ fileName :: String,
fileExtra :: Maybe String
}
deriving (Eq, Show)
instance Encoding FileHeader where
smpEncode FileHeader {fileName, fileExtra} = smpEncode (fileName, fileExtra)
smpP = do
(fileName, fileExtra) <- smpP
pure FileHeader {fileName, fileExtra}
type RcvFileId = Int64
data RcvFile = RcvFile
{ userId :: Int64,
rcvFileId :: RcvFileId,
size :: FileSize Int64,
digest :: FileDigest,
key :: C.SbKey,
nonce :: C.CbNonce,
chunkSize :: FileSize Word32,
chunks :: [RcvFileChunk],
tmpPath :: FilePath,
saveDir :: FilePath,
savePath :: Maybe FilePath,
status :: RcvFileStatus,
status :: RcvFileStatus
}
deriving (Eq, Show)
-- TODO add error status?
data RcvFileStatus
= RFSReceiving
| RFSReceived
| RFSDecrypting
| RFSComplete
deriving (Eq, Show)
instance FromField RcvFileStatus where fromField = fromTextField_ textDecode
instance ToField RcvFileStatus where toField = toField . textEncode
instance TextEncoding RcvFileStatus where
textDecode = \case
"receiving" -> Just RFSReceiving
"received" -> Just RFSReceived
"decrypting" -> Just RFSDecrypting
"complete" -> Just RFSComplete
_ -> Nothing
textEncode = \case
RFSReceiving -> "receiving"
RFSReceived -> "received"
RFSDecrypting -> "decrypting"
RFSComplete -> "complete"
data RcvFileChunk = RcvFileChunk
{ userId :: Int64,
rcvFileId :: RcvFileId,
rcvChunkId :: Int64,
chunkNo :: Int,
chunkSize :: FileSize Word32,
digest :: FileDigest,
replicas :: [RcvFileChunkReplica],
fileTmpPath :: FilePath,
chunkTmpPath :: Maybe FilePath,
nextDelay :: Maybe Int
}
deriving (Eq, Show)
data RcvFileChunkReplica = RcvFileChunkReplica
{ rcvChunkReplicaId :: Int64,
server :: XFTPServer,
replicaId :: ChunkReplicaId,
replicaKey :: C.APrivateSignKey,
received :: Bool,
acknowledged :: Bool,
retries :: Int
}
deriving (Eq, Show)
+18
View File
@@ -0,0 +1,18 @@
module Simplex.FileTransfer.Util
( uniqueCombine,
)
where
import Simplex.Messaging.Util (ifM)
import System.FilePath (splitExtensions, (</>))
import UnliftIO
import UnliftIO.Directory
uniqueCombine :: MonadIO m => FilePath -> String -> m FilePath
uniqueCombine filePath fileName = tryCombine (0 :: Int)
where
tryCombine n =
let (name, ext) = splitExtensions fileName
suffix = if n == 0 then "" else "_" <> show n
f = filePath </> (name <> suffix <> ext)
in ifM (doesPathExist f) (tryCombine $ n + 1) (pure f)
+9
View File
@@ -80,6 +80,7 @@ module Simplex.Messaging.Agent
getNtfToken,
getNtfTokenData,
toggleConnectionNtfs,
xftpReceiveFile,
activateAgent,
suspendAgent,
execAgentStoreSQL,
@@ -113,6 +114,10 @@ import qualified Data.Text as T
import Data.Time.Clock
import Data.Time.Clock.System (systemToUTCTime)
import qualified Database.SQLite.Simple as DB
import Simplex.FileTransfer.Agent (receiveFile)
import Simplex.FileTransfer.Description (ValidFileDescription)
import Simplex.FileTransfer.Protocol (FileParty (..))
import Simplex.FileTransfer.Types (RcvFileId)
import Simplex.Messaging.Agent.Client
import Simplex.Messaging.Agent.Env.SQLite
import Simplex.Messaging.Agent.Lock (withLock)
@@ -322,6 +327,10 @@ getNtfTokenData c = withAgentEnv c $ getNtfTokenData' c
toggleConnectionNtfs :: AgentErrorMonad m => AgentClient -> ConnId -> Bool -> m ()
toggleConnectionNtfs c = withAgentEnv c .: toggleConnectionNtfs' c
-- | Receive XFTP file
xftpReceiveFile :: AgentErrorMonad m => AgentClient -> UserId -> ValidFileDescription 'FPRecipient -> FilePath -> m RcvFileId
xftpReceiveFile c = withAgentEnv c .:. receiveFile c
-- | Activate operations
activateAgent :: AgentErrorMonad m => AgentClient -> m ()
activateAgent c = withAgentEnv c $ activateAgent' c
+105 -25
View File
@@ -15,6 +15,7 @@
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TupleSections #-}
{-# LANGUAGE TypeApplications #-}
{-# LANGUAGE TypeFamilyDependencies #-}
module Simplex.Messaging.Agent.Client
( AgentClient (..),
@@ -48,6 +49,7 @@ module Simplex.Messaging.Agent.Client
agentNtfCreateSubscription,
agentNtfCheckSubscription,
agentNtfDeleteSubscription,
agentXFTPDownloadChunk,
agentCbEncrypt,
agentCbDecrypt,
cryptoError,
@@ -108,10 +110,17 @@ import Data.Maybe (isJust, listToMaybe)
import Data.Set (Set)
import qualified Data.Set as S
import Data.Text.Encoding
import Data.Time (UTCTime)
import Data.Word (Word16)
import qualified Database.SQLite.Simple as DB
import GHC.Generics (Generic)
import Network.Socket (HostName)
import Simplex.FileTransfer.Client (XFTPClient, XFTPClientConfig (..))
import qualified Simplex.FileTransfer.Client as X
import Simplex.FileTransfer.Description (ChunkReplicaId (..))
import Simplex.FileTransfer.Protocol (FileResponse, XFTPErrorType)
import Simplex.FileTransfer.Transport (XFTPRcvChunkSpec)
import Simplex.FileTransfer.Types (RcvFileChunkReplica (..))
import Simplex.Messaging.Agent.Env.SQLite
import Simplex.Messaging.Agent.Lock
import Simplex.Messaging.Agent.Protocol
@@ -151,6 +160,7 @@ import Simplex.Messaging.Protocol
RcvNtfPublicDhKey,
SMPMsgMeta (..),
SndPublicVerifyKey,
XFTPServerWithAuth,
)
import qualified Simplex.Messaging.Protocol as SMP
import Simplex.Messaging.TMap (TMap)
@@ -163,16 +173,20 @@ import UnliftIO (mapConcurrently)
import qualified UnliftIO.Exception as E
import UnliftIO.STM
type ClientVar err msg = TMVar (Either AgentErrorType (ProtocolClient err msg))
type ClientVar msg = TMVar (Either AgentErrorType (Client msg))
type SMPClientVar = TMVar (Either AgentErrorType SMPClient)
type SMPClientVar = ClientVar SMP.BrokerMsg
type NtfClientVar = TMVar (Either AgentErrorType NtfClient)
type NtfClientVar = ClientVar NtfResponse
type XFTPClientVar = TMVar (Either AgentErrorType XFTPClient)
type SMPTransportSession = TransportSession SMP.BrokerMsg
type NtfTransportSession = TransportSession NtfResponse
type XFTPTransportSession = TransportSession FileResponse
data AgentClient = AgentClient
{ active :: TVar Bool,
rcvQ :: TBQueue (ATransmission 'Client),
@@ -182,6 +196,8 @@ data AgentClient = AgentClient
smpClients :: TMap SMPTransportSession SMPClientVar,
ntfServers :: TVar [NtfServer],
ntfClients :: TMap NtfTransportSession NtfClientVar,
xftpServers :: TMap UserId (NonEmpty XFTPServerWithAuth),
xftpClients :: TMap XFTPTransportSession XFTPClientVar,
useNetworkConfig :: TVar NetworkConfig,
subscrConns :: TVar (Set ConnId),
activeSubs :: TRcvQueues,
@@ -246,7 +262,7 @@ data AgentStatsKey = AgentStatsKey
deriving (Eq, Ord, Show)
newAgentClient :: InitialAgentServers -> Env -> STM AgentClient
newAgentClient InitialAgentServers {smp, ntf, netCfg} agentEnv = do
newAgentClient InitialAgentServers {smp, ntf, xftp, netCfg} agentEnv = do
let qSize = tbqSize $ config agentEnv
active <- newTVar True
rcvQ <- newTBQueue qSize
@@ -256,6 +272,8 @@ newAgentClient InitialAgentServers {smp, ntf, netCfg} agentEnv = do
smpClients <- TM.empty
ntfServers <- newTVar ntf
ntfClients <- TM.empty
xftpServers <- newTVar xftp
xftpClients <- TM.empty
useNetworkConfig <- newTVar netCfg
subscrConns <- newTVar S.empty
activeSubs <- RQ.empty
@@ -290,6 +308,8 @@ newAgentClient InitialAgentServers {smp, ntf, netCfg} agentEnv = do
smpClients,
ntfServers,
ntfClients,
xftpServers,
xftpClients,
useNetworkConfig,
subscrConns,
activeSubs,
@@ -320,17 +340,41 @@ newAgentClient InitialAgentServers {smp, ntf, netCfg} agentEnv = do
agentClientStore :: AgentClient -> SQLiteStore
agentClientStore AgentClient {agentEnv = Env {store}} = store
class ProtocolServerClient err msg | msg -> err where
getProtocolServerClient :: AgentMonad m => AgentClient -> TransportSession msg -> m (ProtocolClient err msg)
class (Encoding err, Show err) => ProtocolServerClient err msg | msg -> err where
type Client msg = c | c -> msg
getProtocolServerClient :: AgentMonad m => AgentClient -> TransportSession msg -> m (Client msg)
clientProtocolError :: err -> AgentErrorType
closeProtocolServerClient :: Client msg -> IO ()
clientServer :: Client msg -> String
clientTransportHost :: Client msg -> TransportHost
clientSessionTs :: Client msg -> UTCTime
instance ProtocolServerClient ErrorType BrokerMsg where
type Client BrokerMsg = ProtocolClient ErrorType BrokerMsg
getProtocolServerClient = getSMPServerClient
clientProtocolError = SMP
closeProtocolServerClient = closeProtocolClient
clientServer = protocolClientServer
clientTransportHost = transportHost'
clientSessionTs = sessionTs
instance ProtocolServerClient ErrorType NtfResponse where
type Client NtfResponse = ProtocolClient ErrorType NtfResponse
getProtocolServerClient = getNtfServerClient
clientProtocolError = NTF
closeProtocolServerClient = closeProtocolClient
clientServer = protocolClientServer
clientTransportHost = transportHost'
clientSessionTs = sessionTs
instance ProtocolServerClient XFTPErrorType FileResponse where
type Client FileResponse = XFTPClient
getProtocolServerClient = getXFTPServerClient
clientProtocolError = XFTP
closeProtocolServerClient = X.closeXFTPClient
clientServer = X.xftpClientServer
clientTransportHost = X.xftpTransportHost
clientSessionTs = X.xftpSessionTs
getSMPServerClient :: forall m. AgentMonad m => AgentClient -> SMPTransportSession -> m SMPClient
getSMPServerClient c@AgentClient {active, smpClients, msgQ} tSess@(userId, srv, _) = do
@@ -420,6 +464,27 @@ getNtfServerClient c@AgentClient {active, ntfClients} tSess@(userId, srv, _) = d
atomically $ writeTBQueue (subQ c) ("", "", hostEvent DISCONNECT client)
logInfo . decodeUtf8 $ "Agent disconnected from " <> showServer srv
getXFTPServerClient :: forall m. AgentMonad m => AgentClient -> XFTPTransportSession -> m XFTPClient
getXFTPServerClient c@AgentClient {active, xftpClients, useNetworkConfig} tSess@(userId, srv, _) = do
unlessM (readTVarIO active) . throwError $ INTERNAL "agent is stopped"
atomically (getClientVar tSess xftpClients)
>>= either
(newProtocolClient c tSess xftpClients connectClient $ \_ _ -> pure ())
(waitForProtocolClient c tSess)
where
connectClient :: m XFTPClient
connectClient = do
cfg <- asks $ xftpCfg . config
xftpNetworkConfig <- readTVarIO useNetworkConfig
liftEitherError (protocolClientError XFTP $ B.unpack $ strEncode srv) (X.getXFTPClient tSess cfg {xftpNetworkConfig} clientDisconnected)
clientDisconnected :: XFTPClient -> IO ()
clientDisconnected client = do
atomically $ TM.delete tSess xftpClients
incClientStat c userId client "DISCONNECT" ""
atomically $ writeTBQueue (subQ c) ("", "", hostEvent DISCONNECT client)
logInfo . decodeUtf8 $ "Agent disconnected from " <> showServer srv
getClientVar :: forall a s. TransportSession s -> TMap (TransportSession s) (TMVar a) -> STM (Either (TMVar a) (TMVar a))
getClientVar tSess clients = maybe (Left <$> newClientVar) (pure . Right) =<< TM.lookup tSess clients
where
@@ -429,7 +494,7 @@ getClientVar tSess clients = maybe (Left <$> newClientVar) (pure . Right) =<< TM
TM.insert tSess var clients
pure var
waitForProtocolClient :: (AgentMonad m, ProtocolTypeI (ProtoType msg)) => AgentClient -> TransportSession msg -> ClientVar err msg -> m (ProtocolClient err msg)
waitForProtocolClient :: (AgentMonad m, ProtocolTypeI (ProtoType msg)) => AgentClient -> TransportSession msg -> ClientVar msg -> m (Client msg)
waitForProtocolClient c (_, srv, _) clientVar = do
NetworkConfig {tcpConnectTimeout} <- readTVarIO $ useNetworkConfig c
client_ <- liftIO $ tcpConnectTimeout `timeout` atomically (readTMVar clientVar)
@@ -440,17 +505,17 @@ waitForProtocolClient c (_, srv, _) clientVar = do
newProtocolClient ::
forall err msg m.
(AgentMonad m, ProtocolTypeI (ProtoType msg)) =>
(AgentMonad m, ProtocolTypeI (ProtoType msg), ProtocolServerClient err msg) =>
AgentClient ->
TransportSession msg ->
TMap (TransportSession msg) (ClientVar err msg) ->
m (ProtocolClient err msg) ->
TMap (TransportSession msg) (ClientVar msg) ->
m (Client msg) ->
(AgentClient -> TransportSession msg -> m ()) ->
ClientVar err msg ->
m (ProtocolClient err msg)
ClientVar msg ->
m (Client msg)
newProtocolClient c tSess@(userId, srv, entityId_) clients connectClient reconnectClient clientVar = tryConnectClient pure tryConnectAsync
where
tryConnectClient :: (ProtocolClient err msg -> m a) -> m () -> m a
tryConnectClient :: (Client msg -> m a) -> m () -> m a
tryConnectClient successAction retryAction =
tryError connectClient >>= \r -> case r of
Right client -> do
@@ -475,8 +540,8 @@ newProtocolClient c tSess@(userId, srv, entityId_) clients connectClient reconne
withRetryInterval ri $ \loop -> void $ tryConnectClient (const $ reconnectClient c tSess) loop
atomically . removeAsyncAction aId $ asyncClients c
hostEvent :: forall err msg. ProtocolTypeI (ProtoType msg) => (AProtocolType -> TransportHost -> ACommand 'Agent) -> ProtocolClient err msg -> ACommand 'Agent
hostEvent event client = event (AProtocolType $ protocolTypeI @(ProtoType msg)) $ transportHost' client
hostEvent :: forall err msg. (ProtocolTypeI (ProtoType msg), ProtocolServerClient err msg) => (AProtocolType -> TransportHost -> ACommand 'Agent) -> Client msg -> ACommand 'Agent
hostEvent event = event (AProtocolType $ protocolTypeI @(ProtoType msg)) . clientTransportHost
getClientConfig :: AgentMonad m => AgentClient -> (AgentConfig -> ProtocolClientConfig) -> m ProtocolClientConfig
getClientConfig AgentClient {useNetworkConfig} cfgSel = do
@@ -519,7 +584,7 @@ throwWhenNoDelivery c SndQueue {server, sndId} =
where
k = (server, sndId)
closeProtocolServerClients :: AgentClient -> (AgentClient -> TMap (TransportSession msg) (ClientVar err msg)) -> IO ()
closeProtocolServerClients :: ProtocolServerClient err msg => AgentClient -> (AgentClient -> TMap (TransportSession msg) (ClientVar msg)) -> IO ()
closeProtocolServerClients c clientsSel =
atomically (swapTVar cs M.empty) >>= mapM_ (forkIO . closeClient)
where
@@ -527,7 +592,7 @@ closeProtocolServerClients c clientsSel =
closeClient cVar = do
NetworkConfig {tcpConnectTimeout} <- readTVarIO $ useNetworkConfig c
tcpConnectTimeout `timeout` atomically (readTMVar cVar) >>= \case
Just (Right client) -> closeProtocolClient client `catchAll_` pure ()
Just (Right client) -> closeProtocolServerClient client `catchAll_` pure ()
_ -> pure ()
cancelActions :: (Foldable f, Monoid (f (Async ()))) => TVar (f (Async ())) -> IO ()
@@ -542,29 +607,29 @@ withLockMap_ locks key = withGetLock $ TM.lookup key locks >>= maybe newLock pur
where
newLock = createLock >>= \l -> TM.insert key l locks $> l
withClient_ :: forall a m err msg. (AgentMonad m, ProtocolServerClient err msg) => AgentClient -> TransportSession msg -> ByteString -> (ProtocolClient err msg -> m a) -> m a
withClient_ :: forall a m err msg. (AgentMonad m, ProtocolServerClient err msg) => AgentClient -> TransportSession msg -> ByteString -> (Client msg -> m a) -> m a
withClient_ c tSess@(userId, srv, _) statCmd action = do
cl <- getProtocolServerClient c tSess
(action cl <* stat cl "OK") `catchError` logServerError cl
where
stat cl = liftIO . incClientStat c userId cl statCmd
logServerError :: ProtocolClient err msg -> AgentErrorType -> m a
logServerError :: Client msg -> AgentErrorType -> m a
logServerError cl e = do
logServer "<--" c srv "" $ strEncode e
stat cl $ strEncode e
throwError e
withLogClient_ :: (AgentMonad m, ProtocolServerClient err msg) => AgentClient -> TransportSession msg -> EntityId -> ByteString -> (ProtocolClient err msg -> m a) -> m a
withLogClient_ :: (AgentMonad m, ProtocolServerClient err msg) => AgentClient -> TransportSession msg -> EntityId -> ByteString -> (Client msg -> m a) -> m a
withLogClient_ c tSess@(_, srv, _) entId cmdStr action = do
logServer "-->" c srv entId cmdStr
res <- withClient_ c tSess cmdStr action
logServer "<--" c srv entId "OK"
return res
withClient :: forall m err msg a. (AgentMonad m, ProtocolServerClient err msg, ProtocolTypeI (ProtoType msg), Encoding err, Show err) => AgentClient -> TransportSession msg -> ByteString -> (ProtocolClient err msg -> ExceptT (ProtocolClientError err) IO a) -> m a
withClient :: forall m err msg a. (AgentMonad m, ProtocolServerClient err msg) => AgentClient -> TransportSession msg -> ByteString -> (Client msg -> ExceptT (ProtocolClientError err) IO a) -> m a
withClient c tSess statKey action = withClient_ c tSess statKey $ \client -> liftClient (clientProtocolError @err @msg) (clientServer client) $ action client
withLogClient :: forall m err msg a. (AgentMonad m, ProtocolServerClient err msg, ProtocolTypeI (ProtoType msg), Encoding err, Show err) => AgentClient -> TransportSession msg -> EntityId -> ByteString -> (ProtocolClient err msg -> ExceptT (ProtocolClientError err) IO a) -> m a
withLogClient :: forall m err msg a. (AgentMonad m, ProtocolServerClient err msg) => AgentClient -> TransportSession msg -> EntityId -> ByteString -> (Client msg -> ExceptT (ProtocolClientError err) IO a) -> m a
withLogClient c tSess entId cmdStr action = withLogClient_ c tSess entId cmdStr $ \client -> liftClient (clientProtocolError @err @msg) (clientServer client) $ action client
withSMPClient :: (AgentMonad m, SMPQueueRec q) => AgentClient -> q -> ByteString -> (SMPClient -> ExceptT SMPClientError IO a) -> m a
@@ -580,6 +645,17 @@ withSMPClient_ c q cmdStr action = do
withNtfClient :: forall m a. AgentMonad m => AgentClient -> NtfServer -> EntityId -> ByteString -> (NtfClient -> ExceptT NtfClientError IO a) -> m a
withNtfClient c srv = withLogClient c (0, srv, Nothing)
withXFTPClient ::
(AgentMonad m, ProtocolServerClient err msg) =>
AgentClient ->
(UserId, ProtoServer msg, EntityId) ->
ByteString ->
(Client msg -> ExceptT (ProtocolClientError err) IO b) ->
m b
withXFTPClient c (userId, srv, fId) cmdStr action = do
tSess <- mkTransportSession c userId srv fId
withLogClient c tSess (strEncode fId) cmdStr action
liftClient :: (AgentMonad m, Show err, Encoding err) => (err -> AgentErrorType) -> HostName -> ExceptT (ProtocolClientError err) IO a -> m a
liftClient protocolError_ = liftError . protocolClientError protocolError_
@@ -912,6 +988,10 @@ agentNtfDeleteSubscription :: AgentMonad m => AgentClient -> NtfSubscriptionId -
agentNtfDeleteSubscription c subId NtfToken {ntfServer, ntfPrivKey} =
withNtfClient c ntfServer subId "SDEL" $ \ntf -> ntfDeleteSubscription ntf ntfPrivKey subId
agentXFTPDownloadChunk :: AgentMonad m => AgentClient -> UserId -> RcvFileChunkReplica -> XFTPRcvChunkSpec -> m ()
agentXFTPDownloadChunk c userId RcvFileChunkReplica {server, replicaId = ChunkReplicaId fId, replicaKey} chunkSpec =
withXFTPClient c (userId, server, fId) "FGET" $ \xftp -> X.downloadXFTPChunk xftp replicaKey fId chunkSpec
agentCbEncrypt :: AgentMonad m => SndQueue -> Maybe C.PublicKeyX25519 -> ByteString -> m ByteString
agentCbEncrypt SndQueue {e2eDhSecret, smpClientVersion} e2ePubKey msg = do
cmNonce <- liftIO C.randomCbNonce
@@ -1041,7 +1121,7 @@ incStat AgentClient {agentStats} n k = do
Just v -> modifyTVar' v (+ n)
_ -> newTVar n >>= \v -> TM.insert k v agentStats
incClientStat :: AgentClient -> UserId -> ProtocolClient err msg -> ByteString -> ByteString -> IO ()
incClientStat :: ProtocolServerClient err msg => AgentClient -> UserId -> Client msg -> ByteString -> ByteString -> IO ()
incClientStat c userId pc = incClientStatN c userId pc 1
incServerStat :: AgentClient -> UserId -> ProtocolServer p -> ByteString -> ByteString -> IO ()
@@ -1051,8 +1131,8 @@ incServerStat c userId ProtocolServer {host} cmd res = do
where
statsKey = AgentStatsKey {userId, host = strEncode $ L.head host, clientTs = "", cmd, res}
incClientStatN :: AgentClient -> UserId -> ProtocolClient err msg -> Int -> ByteString -> ByteString -> IO ()
incClientStatN :: ProtocolServerClient err msg => AgentClient -> UserId -> Client msg -> Int -> ByteString -> ByteString -> IO ()
incClientStatN c userId pc n cmd res = do
atomically $ incStat c n statsKey
where
statsKey = AgentStatsKey {userId, host = strEncode $ transportHost' pc, clientTs = strEncode $ sessionTs pc, cmd, res}
statsKey = AgentStatsKey {userId, host = strEncode $ clientTransportHost pc, clientTs = strEncode $ clientSessionTs pc, cmd, res}
+19 -3
View File
@@ -24,6 +24,7 @@ module Simplex.Messaging.Agent.Env.SQLite
createAgentStore,
NtfSupervisor (..),
NtfSupervisorCommand (..),
XFTPAgent (..),
)
where
@@ -37,6 +38,7 @@ import Data.Time.Clock (NominalDiffTime, nominalDay)
import Data.Word (Word16)
import Network.Socket
import Numeric.Natural
import Simplex.FileTransfer.Client (XFTPClientConfig (..), defaultXFTPClientConfig)
import Simplex.Messaging.Agent.Protocol
import Simplex.Messaging.Agent.RetryInterval
import Simplex.Messaging.Agent.Store (UserId)
@@ -47,7 +49,7 @@ import Simplex.Messaging.Client.Agent ()
import qualified Simplex.Messaging.Crypto as C
import Simplex.Messaging.Crypto.Ratchet (supportedE2EEncryptVRange)
import Simplex.Messaging.Notifications.Types
import Simplex.Messaging.Protocol (NtfServer, supportedSMPClientVRange)
import Simplex.Messaging.Protocol (NtfServer, XFTPServer, XFTPServerWithAuth, supportedSMPClientVRange)
import Simplex.Messaging.TMap (TMap)
import qualified Simplex.Messaging.TMap as TM
import Simplex.Messaging.Transport (TLS, Transport (..))
@@ -63,6 +65,7 @@ type AgentMonad m = (MonadUnliftIO m, MonadReader Env m, MonadError AgentErrorTy
data InitialAgentServers = InitialAgentServers
{ smp :: Map UserId (NonEmpty SMPServerWithAuth),
ntf :: [NtfServer],
xftp :: Map UserId (NonEmpty XFTPServerWithAuth),
netCfg :: NetworkConfig
}
@@ -84,6 +87,7 @@ data AgentConfig = AgentConfig
yesToMigrations :: Bool,
smpCfg :: ProtocolClientConfig,
ntfCfg :: ProtocolClientConfig,
xftpCfg :: XFTPClientConfig,
reconnectInterval :: RetryInterval,
messageRetryInterval :: RetryInterval2,
messageTimeout :: NominalDiffTime,
@@ -144,6 +148,7 @@ defaultAgentConfig =
yesToMigrations = False,
smpCfg = defaultClientConfig {defaultTransport = (show defaultSMPPort, transport @TLS)},
ntfCfg = defaultClientConfig {defaultTransport = ("443", transport @TLS)},
xftpCfg = defaultXFTPClientConfig,
reconnectInterval = defaultReconnectInterval,
messageRetryInterval = defaultMessageRetryInterval,
messageTimeout = 2 * nominalDay,
@@ -173,7 +178,8 @@ data Env = Env
idsDrg :: TVar ChaChaDRG,
clientCounter :: TVar Int,
randomServer :: TVar StdGen,
ntfSupervisor :: NtfSupervisor
ntfSupervisor :: NtfSupervisor,
xftpAgent :: XFTPAgent
}
newSMPAgentEnv :: (MonadUnliftIO m, MonadRandom m) => AgentConfig -> m Env
@@ -185,7 +191,8 @@ newSMPAgentEnv config@AgentConfig {database, yesToMigrations, initialClientId} =
clientCounter <- newTVarIO initialClientId
randomServer <- newTVarIO =<< liftIO newStdGen
ntfSupervisor <- atomically . newNtfSubSupervisor $ tbqSize config
return Env {config, store, idsDrg, clientCounter, randomServer, ntfSupervisor}
xftpAgent <- atomically newXFTPAgent
return Env {config, store, idsDrg, clientCounter, randomServer, ntfSupervisor, xftpAgent}
createAgentStore :: FilePath -> String -> Bool -> IO SQLiteStore
createAgentStore dbFilePath dbKey = createSQLiteStore dbFilePath dbKey Migrations.app
@@ -207,3 +214,12 @@ newNtfSubSupervisor qSize = do
ntfWorkers <- TM.empty
ntfSMPWorkers <- TM.empty
pure NtfSupervisor {ntfTkn, ntfSubQ, ntfWorkers, ntfSMPWorkers}
data XFTPAgent = XFTPAgent
{ xftpWorkers :: TMap (Maybe XFTPServer) (TMVar (), Async ())
}
newXFTPAgent :: STM XFTPAgent
newXFTPAgent = do
xftpWorkers <- TM.empty
pure XFTPAgent {xftpWorkers}
+12
View File
@@ -155,6 +155,8 @@ import Database.SQLite.Simple.FromField
import Database.SQLite.Simple.ToField
import GHC.Generics (Generic)
import Generic.Random (genericArbitraryU)
import Simplex.FileTransfer.Protocol (XFTPErrorType)
import Simplex.FileTransfer.Types (RcvFileId)
import Simplex.Messaging.Agent.QueryString
import qualified Simplex.Messaging.Crypto as C
import Simplex.Messaging.Crypto.Ratchet (E2ERatchetParams, E2ERatchetParamsUri)
@@ -282,6 +284,7 @@ data ACommand (p :: AParty) where
OK :: ACommand Agent
ERR :: AgentErrorType -> ACommand Agent
SUSPENDED :: ACommand Agent
FRCVD :: RcvFileId -> FilePath -> ACommand Agent
deriving instance Eq (ACommand p)
@@ -324,6 +327,7 @@ data ACommandTag (p :: AParty) where
OK_ :: ACommandTag Agent
ERR_ :: ACommandTag Agent
SUSPENDED_ :: ACommandTag Agent
FRCVD_ :: ACommandTag Agent
deriving instance Eq (ACommandTag p)
@@ -365,6 +369,7 @@ aCommandTag = \case
OK -> OK_
ERR _ -> ERR_
SUSPENDED -> SUSPENDED_
FRCVD {} -> FRCVD_
data QueueDirection = QDRcv | QDSnd
deriving (Eq, Show)
@@ -1072,6 +1077,8 @@ data AgentErrorType
SMP {smpErr :: ErrorType}
| -- | NTF protocol errors forwarded to agent clients
NTF {ntfErr :: ErrorType}
| -- | XFTP protocol errors forwarded to agent clients
XFTP {xftpErr :: XFTPErrorType}
| -- | SMP server errors
BROKER {brokerAddress :: String, brokerErr :: BrokerErrorType}
| -- | errors of other agents
@@ -1166,6 +1173,7 @@ instance StrEncoding AgentErrorType where
<|> "CONN " *> (CONN <$> parseRead1)
<|> "SMP " *> (SMP <$> strP)
<|> "NTF " *> (NTF <$> strP)
<|> "XFTP " *> (XFTP <$> strP)
<|> "BROKER " *> (BROKER <$> textP <* " RESPONSE " <*> (RESPONSE <$> textP))
<|> "BROKER " *> (BROKER <$> textP <* " TRANSPORT " <*> (TRANSPORT <$> transportErrorP))
<|> "BROKER " *> (BROKER <$> textP <* A.space <*> parseRead1)
@@ -1179,6 +1187,7 @@ instance StrEncoding AgentErrorType where
CONN e -> "CONN " <> bshow e
SMP e -> "SMP " <> strEncode e
NTF e -> "NTF " <> strEncode e
XFTP e -> "XFTP " <> strEncode e
BROKER srv (RESPONSE e) -> "BROKER " <> text srv <> " RESPONSE " <> text e
BROKER srv (TRANSPORT e) -> "BROKER " <> text srv <> " TRANSPORT " <> serializeTransportError e
BROKER srv e -> "BROKER " <> text srv <> " " <> bshow e
@@ -1282,6 +1291,7 @@ instance APartyI p => StrEncoding (ACommandTag p) where
OK_ -> "OK"
ERR_ -> "ERR"
SUSPENDED_ -> "SUSPENDED"
FRCVD_ -> "FRCVD"
strP = (\(ACmdTag _ t) -> checkParty t) <$?> strP
checkParty :: forall t p p'. (APartyI p, APartyI p') => t p' -> Either String (t p)
@@ -1332,6 +1342,7 @@ commandP binaryP =
OK_ -> pure OK
ERR_ -> s (ERR <$> strP)
SUSPENDED_ -> pure SUSPENDED
FRCVD_ -> s (FRCVD <$> A.decimal <* A.space <*> strP)
where
s :: Parser a -> Parser a
s p = A.space *> p
@@ -1385,6 +1396,7 @@ serializeCommand = \case
ERR e -> s (ERR_, e)
OK -> s OK_
SUSPENDED -> s SUSPENDED_
FRCVD fId fPath -> s (FRCVD_, Str $ bshow fId, fPath)
where
s :: StrEncoding a => a -> ByteString
s = strEncode
+4
View File
@@ -535,4 +535,8 @@ data StoreError
SEX3dhKeysNotFound
| -- | Used to wrap agent errors inside store operations to avoid race conditions
SEAgentError AgentErrorType
| -- | XFTP Server not found.
SEXFTPServerNotFound
| -- | XFTP File not found.
SEFileNotFound
deriving (Eq, Show, Exception)
+190 -1
View File
@@ -123,6 +123,16 @@ module Simplex.Messaging.Agent.Store.SQLite
getActiveNtfToken,
getNtfRcvQueue,
setConnectionNtfs,
-- File transfer
createRcvFile,
getRcvFile,
updateRcvFileChunkReceived,
updateRcvFileStatus,
updateRcvFileComplete,
updateRcvFileChunkReplicaRetries,
getNextRcvChunkToDownload,
getNextRcvFileToDecrypt,
getUnreceivedRcvFiles,
-- * utilities
withConnection,
@@ -155,6 +165,7 @@ import Data.Text (Text)
import qualified Data.Text as T
import Data.Text.Encoding (decodeLatin1, encodeUtf8)
import Data.Time.Clock (UTCTime, getCurrentTime)
import Data.Word (Word32)
import Database.SQLite.Simple (FromRow, NamedParam (..), Only (..), Query (..), SQLError, ToRow, field, (:.) (..))
import qualified Database.SQLite.Simple as DB
import Database.SQLite.Simple.FromField
@@ -162,6 +173,9 @@ import Database.SQLite.Simple.QQ (sql)
import Database.SQLite.Simple.ToField (ToField (..))
import qualified Database.SQLite3 as SQLite3
import Network.Socket (ServiceName)
import Simplex.FileTransfer.Description
import Simplex.FileTransfer.Protocol (FileParty (..))
import Simplex.FileTransfer.Types
import Simplex.Messaging.Agent.Protocol
import Simplex.Messaging.Agent.Store
import Simplex.Messaging.Agent.Store.SQLite.Migrations (Migration)
@@ -173,7 +187,7 @@ import Simplex.Messaging.Encoding.String
import Simplex.Messaging.Notifications.Protocol (DeviceToken (..), NtfSubscriptionId, NtfTknStatus (..), NtfTokenId, SMPQueueNtf (..))
import Simplex.Messaging.Notifications.Types
import Simplex.Messaging.Parsers (blobFieldParser, fromTextField_)
import Simplex.Messaging.Protocol (MsgBody, MsgFlags, NtfServer, ProtocolServer (..), RcvNtfDhSecret, SndPublicVerifyKey, pattern NtfServer)
import Simplex.Messaging.Protocol
import qualified Simplex.Messaging.Protocol as SMP
import Simplex.Messaging.Transport.Client (TransportHost)
import Simplex.Messaging.Util (bshow, eitherToMaybe, ($>>=), (<$$>))
@@ -1703,3 +1717,178 @@ randomId gVar n = U.encode <$> (atomically . stateTVar gVar $ randomBytesGenerat
ntfSubAndSMPAction :: NtfSubAction -> (Maybe NtfSubNTFAction, Maybe NtfSubSMPAction)
ntfSubAndSMPAction (NtfSubNTFAction action) = (Just action, Nothing)
ntfSubAndSMPAction (NtfSubSMPAction action) = (Nothing, Just action)
createXFTPServer_ :: DB.Connection -> XFTPServer -> IO Int64
createXFTPServer_ db newSrv@ProtocolServer {host, port, keyHash} =
getXFTPServerId_ db newSrv >>= \case
Right srvId -> pure srvId
Left _ -> insertNewServer_
where
insertNewServer_ = do
DB.execute db "INSERT INTO xftp_servers (xftp_host, xftp_port, xftp_key_hash) VALUES (?,?,?)" (host, port, keyHash)
insertedRowId db
getXFTPServerId_ :: DB.Connection -> XFTPServer -> IO (Either StoreError Int64)
getXFTPServerId_ db ProtocolServer {host, port, keyHash} = do
firstRow fromOnly SEXFTPServerNotFound $
DB.query db "SELECT xftp_server_id FROM xftp_servers WHERE xftp_host = ? AND xftp_port = ? AND xftp_key_hash = ?" (host, port, keyHash)
createRcvFile :: DB.Connection -> UserId -> FileDescription 'FPRecipient -> FilePath -> FilePath -> IO RcvFileId
createRcvFile db userId fd@FileDescription {chunks} saveDir tmpPath = do
rcvFileId <- insertRcvFile fd
forM_ chunks $ \fc@FileChunk {replicas} -> do
chunkId <- insertChunk fc rcvFileId
forM_ (zip [1 ..] replicas) $ \(rno, replica) -> insertReplica rno replica chunkId
pure rcvFileId
where
insertRcvFile FileDescription {size, digest, key, nonce, chunkSize} = do
DB.execute
db
"INSERT INTO rcv_files (user_id, size, digest, key, nonce, chunk_size, tmp_path, save_dir, status) VALUES (?,?,?,?,?,?,?,?,?)"
(userId, size, digest, key, nonce, chunkSize, tmpPath, saveDir, RFSReceiving)
insertedRowId db
insertChunk FileChunk {chunkNo, chunkSize, digest} rcvFileId = do
DB.execute
db
"INSERT INTO rcv_file_chunks (rcv_file_id, chunk_no, chunk_size, digest) VALUES (?,?,?,?)"
(rcvFileId, chunkNo, chunkSize, digest)
insertedRowId db
insertReplica :: Int -> FileChunkReplica -> Int64 -> IO ()
insertReplica replicaNo FileChunkReplica {server, replicaId, replicaKey} chunkId = do
srvId <- createXFTPServer_ db server
DB.execute
db
"INSERT INTO rcv_file_chunk_replicas (replica_number, rcv_file_chunk_id, xftp_server_id, replica_id, replica_key) VALUES (?,?,?,?,?)"
(replicaNo, chunkId, srvId, replicaId, replicaKey)
getRcvFile :: DB.Connection -> RcvFileId -> IO (Either StoreError RcvFile)
getRcvFile db rcvFileId = runExceptT $ do
fd@RcvFile {userId, tmpPath} <- ExceptT getFile
chunks <- liftIO $ getChunks userId tmpPath
pure (fd {chunks} :: RcvFile)
where
getFile :: IO (Either StoreError RcvFile)
getFile = do
firstRow toFile SEFileNotFound $
DB.query
db
[sql|
SELECT user_id, size, digest, key, nonce, chunk_size, tmp_path, save_dir, save_path, status
FROM rcv_files
WHERE rcv_file_id = ?
|]
(Only rcvFileId)
where
toFile :: (UserId, FileSize Int64, FileDigest, C.SbKey, C.CbNonce, FileSize Word32, FilePath, FilePath, Maybe FilePath, RcvFileStatus) -> RcvFile
toFile (userId, size, digest, key, nonce, chunkSize, tmpPath, saveDir, savePath, status) =
RcvFile {userId, rcvFileId, size, digest, key, nonce, chunkSize, tmpPath, saveDir, savePath, status, chunks = []}
getChunks :: UserId -> FilePath -> IO [RcvFileChunk]
getChunks userId fileTmpPath = do
chunks <-
map toChunk
<$> DB.query
db
[sql|
SELECT rcv_file_chunk_id, chunk_no, chunk_size, digest, tmp_path, next_delay
FROM rcv_file_chunks
WHERE rcv_file_id = ?
|]
(Only rcvFileId)
forM chunks $ \chunk@RcvFileChunk {rcvChunkId} -> do
replicas' <- getChunkReplicas rcvChunkId
pure (chunk {replicas = replicas'} :: RcvFileChunk)
where
toChunk :: (Int64, Int, FileSize Word32, FileDigest, Maybe FilePath, Maybe Int) -> RcvFileChunk
toChunk (rcvChunkId, chunkNo, chunkSize, digest, chunkTmpPath, nextDelay) =
RcvFileChunk {userId, rcvFileId, rcvChunkId, chunkNo, chunkSize, digest, fileTmpPath, chunkTmpPath, nextDelay, replicas = []}
getChunkReplicas :: Int64 -> IO [RcvFileChunkReplica]
getChunkReplicas chunkId = do
map toReplica
<$> DB.query
db
[sql|
SELECT
r.rcv_file_chunk_replica_id, r.replica_id, r.replica_key, r.received, r.acknowledged, r.retries,
s.xftp_host, s.xftp_port, s.xftp_key_hash
FROM rcv_file_chunk_replicas r
JOIN xftp_servers s ON s.xftp_server_id = r.xftp_server_id
WHERE r.rcv_file_chunk_id = ?
|]
(Only chunkId)
where
toReplica :: (Int64, ChunkReplicaId, C.APrivateSignKey, Bool, Bool, Int, NonEmpty TransportHost, ServiceName, C.KeyHash) -> RcvFileChunkReplica
toReplica (rcvChunkReplicaId, replicaId, replicaKey, received, acknowledged, retries, host, port, keyHash) =
let server = XFTPServer host port keyHash
in RcvFileChunkReplica {rcvChunkReplicaId, server, replicaId, replicaKey, received, acknowledged, retries}
updateRcvFileChunkReceived :: DB.Connection -> Int64 -> Int64 -> RcvFileId -> FilePath -> IO (Either StoreError RcvFile)
updateRcvFileChunkReceived db rId cId fId chunkTmpPath = do
updatedAt <- getCurrentTime
DB.execute db "UPDATE rcv_file_chunk_replicas SET received = 1, updated_at = ? WHERE rcv_file_chunk_replica_id = ?" (updatedAt, rId)
DB.execute db "UPDATE rcv_file_chunks SET tmp_path = ?, updated_at = ? WHERE rcv_file_chunk_id = ?" (chunkTmpPath, updatedAt, cId)
getRcvFile db fId
updateRcvFileStatus :: DB.Connection -> RcvFileId -> RcvFileStatus -> IO ()
updateRcvFileStatus db rcvFileId status = do
updatedAt <- getCurrentTime
DB.execute db "UPDATE rcv_files SET status = ?, updated_at = ? WHERE rcv_file_id = ?" (status, updatedAt, rcvFileId)
updateRcvFileComplete :: DB.Connection -> RcvFileId -> FilePath -> IO ()
updateRcvFileComplete db rcvFileId savePath = do
updatedAt <- getCurrentTime
DB.execute db "UPDATE rcv_files SET save_path = ?, status = ?, updated_at = ? WHERE rcv_file_id = ?" (savePath, RFSComplete, updatedAt, rcvFileId)
updateRcvFileChunkReplicaRetries :: DB.Connection -> Int64 -> IO ()
updateRcvFileChunkReplicaRetries _db _replicaId = do
-- update rcv_file_chunk_replicas
undefined
getNextRcvChunkToDownload :: DB.Connection -> XFTPServer -> IO (Maybe RcvFileChunk)
getNextRcvChunkToDownload db server@ProtocolServer {host, port, keyHash} = do
maybeFirstRow toChunk $
DB.query
db
[sql|
SELECT
f.user_id, f.rcv_file_id, c.rcv_file_chunk_id, c.chunk_no, c.chunk_size, c.digest, f.tmp_path, c.tmp_path, c.next_delay,
r.rcv_file_chunk_replica_id, r.replica_id, r.replica_key, r.received, r.acknowledged, r.retries
FROM rcv_file_chunk_replicas r
JOIN xftp_servers s ON s.xftp_server_id = r.xftp_server_id
JOIN rcv_file_chunks c ON c.rcv_file_chunk_id = r.rcv_file_chunk_id
JOIN rcv_files f ON f.rcv_file_id = c.rcv_file_id
WHERE s.xftp_host = ? AND s.xftp_port = ? AND s.xftp_key_hash = ?
AND r.received = 0 AND r.replica_number = 1
ORDER BY r.created_at ASC
LIMIT 1
|]
(host, port, keyHash)
where
toChunk :: ((UserId, RcvFileId, Int64, Int, FileSize Word32, FileDigest, FilePath, Maybe FilePath, Maybe Int) :. (Int64, ChunkReplicaId, C.APrivateSignKey, Bool, Bool, Int)) -> RcvFileChunk
toChunk ((userId, rcvFileId, rcvChunkId, chunkNo, chunkSize, digest, fileTmpPath, chunkTmpPath, nextDelay) :. (rcvChunkReplicaId, replicaId, replicaKey, received, acknowledged, retries)) =
RcvFileChunk
{ userId,
rcvFileId,
rcvChunkId,
chunkNo,
chunkSize,
digest,
fileTmpPath,
chunkTmpPath,
nextDelay,
replicas = [RcvFileChunkReplica {rcvChunkReplicaId, server, replicaId, replicaKey, received, acknowledged, retries}]
}
getNextRcvFileToDecrypt :: DB.Connection -> IO (Maybe RcvFile)
getNextRcvFileToDecrypt db = do
fileId_ :: Maybe RcvFileId <-
maybeFirstRow fromOnly $
DB.query db "SELECT rcv_file_id FROM rcv_files WHERE status = ? ORDER BY created_at ASC LIMIT 1" (Only RFSReceived)
case fileId_ of
Nothing -> pure Nothing
Just fileId -> eitherToMaybe <$> getRcvFile db fileId
getUnreceivedRcvFiles :: DB.Connection -> IO [RcvFile]
getUnreceivedRcvFiles _db = do
-- get unique file ids from rcv_files where status /= complete
-- getRcvFile for each file id
undefined
@@ -41,6 +41,7 @@ import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230110_users
import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230117_fkey_indexes
import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230120_delete_errors
import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230217_server_key_hash
import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230223_files
import Simplex.Messaging.Encoding.String
import Simplex.Messaging.Transport.Client (TransportHost)
@@ -61,7 +62,8 @@ schemaMigrations =
("m20230110_users", m20230110_users),
("m20230117_fkey_indexes", m20230117_fkey_indexes),
("m20230120_delete_errors", m20230120_delete_errors),
("m20230217_server_key_hash", m20230217_server_key_hash)
("m20230217_server_key_hash", m20230217_server_key_hash),
("m20230223_files", m20230223_files)
]
-- | The list of migrations in ascending order by date
@@ -0,0 +1,69 @@
{-# LANGUAGE QuasiQuotes #-}
module Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230223_files where
import Database.SQLite.Simple (Query)
import Database.SQLite.Simple.QQ (sql)
m20230223_files :: Query
m20230223_files =
[sql|
CREATE TABLE xftp_servers (
xftp_server_id INTEGER PRIMARY KEY,
xftp_host TEXT NOT NULL,
xftp_port TEXT NOT NULL,
xftp_key_hash BLOB NOT NULL,
created_at TEXT NOT NULL DEFAULT (datetime('now')),
updated_at TEXT NOT NULL DEFAULT (datetime('now')),
UNIQUE(xftp_host, xftp_port, xftp_key_hash)
);
CREATE TABLE rcv_files (
rcv_file_id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER NOT NULL REFERENCES users ON DELETE CASCADE,
size INTEGER NOT NULL,
digest BLOB NOT NULL,
key BLOB NOT NULL,
nonce BLOB NOT NULL,
chunk_size INTEGER NOT NULL,
tmp_path TEXT NOT NULL,
save_dir TEXT NOT NULL,
save_path TEXT,
status TEXT NOT NULL,
created_at TEXT NOT NULL DEFAULT (datetime('now')),
updated_at TEXT NOT NULL DEFAULT (datetime('now'))
);
CREATE INDEX idx_rcv_files_user_id ON rcv_files(user_id);
CREATE TABLE rcv_file_chunks (
rcv_file_chunk_id INTEGER PRIMARY KEY,
rcv_file_id INTEGER NOT NULL REFERENCES rcv_files ON DELETE CASCADE,
chunk_no INTEGER NOT NULL,
chunk_size INTEGER NOT NULL,
digest BLOB NOT NULL,
tmp_path TEXT,
next_delay INTEGER,
created_at TEXT NOT NULL DEFAULT (datetime('now')),
updated_at TEXT NOT NULL DEFAULT (datetime('now'))
);
CREATE INDEX idx_rcv_file_chunks_rcv_file_id ON rcv_file_chunks(rcv_file_id);
CREATE TABLE rcv_file_chunk_replicas (
rcv_file_chunk_replica_id INTEGER PRIMARY KEY,
rcv_file_chunk_id INTEGER NOT NULL REFERENCES rcv_file_chunks ON DELETE CASCADE,
replica_number INTEGER NOT NULL,
xftp_server_id INTEGER NOT NULL REFERENCES xftp_servers ON DELETE CASCADE,
replica_id BLOB NOT NULL,
replica_key BLOB NOT NULL,
received INTEGER NOT NULL DEFAULT 0,
acknowledged INTEGER NOT NULL DEFAULT 0,
retries INTEGER NOT NULL DEFAULT 0,
created_at TEXT NOT NULL DEFAULT (datetime('now')),
updated_at TEXT NOT NULL DEFAULT (datetime('now'))
);
CREATE INDEX idx_rcv_file_chunk_replicas_rcv_file_chunk_id ON rcv_file_chunk_replicas(rcv_file_chunk_id);
CREATE INDEX idx_rcv_file_chunk_replicas_xftp_server_id ON rcv_file_chunk_replicas(xftp_server_id);
|]
@@ -283,3 +283,59 @@ CREATE INDEX idx_snd_messages_conn_id_internal_id ON snd_messages(
internal_id
);
CREATE INDEX idx_snd_queues_host_port ON snd_queues(host, port);
CREATE TABLE xftp_servers(
xftp_server_id INTEGER PRIMARY KEY,
xftp_host TEXT NOT NULL,
xftp_port TEXT NOT NULL,
xftp_key_hash BLOB NOT NULL,
created_at TEXT NOT NULL DEFAULT(datetime('now')),
updated_at TEXT NOT NULL DEFAULT(datetime('now')),
UNIQUE(xftp_host, xftp_port, xftp_key_hash)
);
CREATE TABLE rcv_files(
rcv_file_id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER NOT NULL REFERENCES users ON DELETE CASCADE,
size INTEGER NOT NULL,
digest BLOB NOT NULL,
key BLOB NOT NULL,
nonce BLOB NOT NULL,
chunk_size INTEGER NOT NULL,
tmp_path TEXT NOT NULL,
save_dir TEXT NOT NULL,
save_path TEXT,
status TEXT NOT NULL,
created_at TEXT NOT NULL DEFAULT(datetime('now')),
updated_at TEXT NOT NULL DEFAULT(datetime('now'))
);
CREATE INDEX idx_rcv_files_user_id ON rcv_files(user_id);
CREATE TABLE rcv_file_chunks(
rcv_file_chunk_id INTEGER PRIMARY KEY,
rcv_file_id INTEGER NOT NULL REFERENCES rcv_files ON DELETE CASCADE,
chunk_no INTEGER NOT NULL,
chunk_size INTEGER NOT NULL,
digest BLOB NOT NULL,
tmp_path TEXT,
next_delay INTEGER,
created_at TEXT NOT NULL DEFAULT(datetime('now')),
updated_at TEXT NOT NULL DEFAULT(datetime('now'))
);
CREATE INDEX idx_rcv_file_chunks_rcv_file_id ON rcv_file_chunks(rcv_file_id);
CREATE TABLE rcv_file_chunk_replicas(
rcv_file_chunk_replica_id INTEGER PRIMARY KEY,
rcv_file_chunk_id INTEGER NOT NULL REFERENCES rcv_file_chunks ON DELETE CASCADE,
replica_number INTEGER NOT NULL,
xftp_server_id INTEGER NOT NULL REFERENCES xftp_servers ON DELETE CASCADE,
replica_id BLOB NOT NULL,
replica_key BLOB NOT NULL,
received INTEGER NOT NULL DEFAULT 0,
acknowledged INTEGER NOT NULL DEFAULT 0,
retries INTEGER NOT NULL DEFAULT 0,
created_at TEXT NOT NULL DEFAULT(datetime('now')),
updated_at TEXT NOT NULL DEFAULT(datetime('now'))
);
CREATE INDEX idx_rcv_file_chunk_replicas_rcv_file_chunk_id ON rcv_file_chunk_replicas(
rcv_file_chunk_id
);
CREATE INDEX idx_rcv_file_chunk_replicas_xftp_server_id ON rcv_file_chunk_replicas(
xftp_server_id
);
+3 -3
View File
@@ -31,7 +31,7 @@ module Simplex.Messaging.Client
SMPClient,
getProtocolClient,
closeProtocolClient,
clientServer,
protocolClientServer,
transportHost',
transportSession',
@@ -254,8 +254,8 @@ chooseTransportHost NetworkConfig {socksProxy, hostMode, requiredHostMode} hosts
onionHost = find isOnionHost hosts
publicHost = find (not . isOnionHost) hosts
clientServer :: ProtocolTypeI (ProtoType msg) => ProtocolClient err msg -> String
clientServer = B.unpack . strEncode . snd3 . transportSession . client_
protocolClientServer :: ProtocolTypeI (ProtoType msg) => ProtocolClient err msg -> String
protocolClientServer = B.unpack . strEncode . snd3 . transportSession . client_
where
snd3 (_, s, _) = s
+8
View File
@@ -1032,6 +1032,10 @@ instance ToJSON CbNonce where
instance FromJSON CbNonce where
parseJSON = strParseJSON "CbNonce"
instance FromField CbNonce where fromField f = CryptoBoxNonce <$> fromField f
instance ToField CbNonce where toField (CryptoBoxNonce s) = toField s
cbNonce :: ByteString -> CbNonce
cbNonce s
| len == 24 = CryptoBoxNonce s
@@ -1072,6 +1076,10 @@ instance ToJSON SbKey where
instance FromJSON SbKey where
parseJSON = strParseJSON "SbKey"
instance FromField SbKey where fromField f = SecretBoxKey <$> fromField f
instance ToField SbKey where toField (SecretBoxKey s) = toField s
sbKey :: ByteString -> Either String SbKey
sbKey s
| B.length s == 32 = Right $ SecretBoxKey s
+6 -1
View File
@@ -32,6 +32,7 @@ import Simplex.Messaging.Agent.Server (runSMPAgentBlocking)
import Simplex.Messaging.Agent.Store (UserId)
import Simplex.Messaging.Client (ProtocolClientConfig (..), chooseTransportHost, defaultClientConfig, defaultNetworkConfig)
import Simplex.Messaging.Parsers (parseAll)
import Simplex.Messaging.Protocol (ProtoServerWithAuth, XFTPServer)
import Simplex.Messaging.Transport
import Simplex.Messaging.Transport.Client
import Test.Hspec
@@ -173,11 +174,15 @@ testSMPServer = "smp://LcJUMfVhwD8yxjAiSaDzzGF3-kLG4Uh0Fl_ZIjrRwjI=@localhost:50
testSMPServer2 :: SMPServer
testSMPServer2 = "smp://LcJUMfVhwD8yxjAiSaDzzGF3-kLG4Uh0Fl_ZIjrRwjI=@localhost:5002"
testXFTPServer :: XFTPServer
testXFTPServer = "xftp://LcJUMfVhwD8yxjAiSaDzzGF3-kLG4Uh0Fl_ZIjrRwjI=@localhost:7001"
initAgentServers :: InitialAgentServers
initAgentServers =
InitialAgentServers
{ smp = userServers [noAuthSrv testSMPServer],
ntf = ["ntf://LcJUMfVhwD8yxjAiSaDzzGF3-kLG4Uh0Fl_ZIjrRwjI=@localhost:6001"],
xftp = userServers [noAuthSrv testXFTPServer],
netCfg = defaultNetworkConfig {tcpTimeout = 500_000, tcpConnectTimeout = 500_000}
}
@@ -208,7 +213,7 @@ withSmpAgentThreadOn_ t (port', smpPort', db') afterProcess =
(\started -> runSMPAgentBlocking t started cfg' initServers')
afterProcess
userServers :: NonEmpty SMPServerWithAuth -> Map UserId (NonEmpty SMPServerWithAuth)
userServers :: NonEmpty (ProtoServerWithAuth p) -> Map UserId (NonEmpty (ProtoServerWithAuth p))
userServers srvs = M.fromList [(1, srvs)]
withSmpAgentThreadOn :: (MonadUnliftIO m, MonadRandom m) => ATransport -> (ServiceName, ServiceName, AgentDatabase) -> (ThreadId -> m a) -> m a
+2
View File
@@ -16,6 +16,7 @@ import Simplex.Messaging.Transport.WebSockets (WS)
import System.Directory (createDirectoryIfMissing, removeDirectoryRecursive)
import System.Environment (setEnv)
import Test.Hspec
import XFTPAgent
import XFTPCLI
import XFTPServerTests (xftpServerTests)
@@ -46,4 +47,5 @@ main = do
describe "XFTP server" xftpServerTests
describe "XFTP file description" fileDescriptionTests
describe "XFTP CLI" xftpCLITests
describe "XFTP agent" xftpAgentTests
describe "Server CLIs" cliTests
+59
View File
@@ -0,0 +1,59 @@
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE ScopedTypeVariables #-}
module XFTPAgent where
import AgentTests.FunctionalAPITests (get, runRight_)
import Control.Monad.Except
import Data.Bifunctor (first)
import qualified Data.ByteString as LB
import SMPAgentClient (agentCfg, initAgentServers)
import Simplex.FileTransfer.Description
import Simplex.FileTransfer.Protocol (FileParty (..), checkParty)
import Simplex.Messaging.Agent (getSMPAgentClient, xftpReceiveFile)
import Simplex.Messaging.Agent.Protocol (ACommand (FRCVD), AgentErrorType (..))
import Simplex.Messaging.Encoding.String (StrEncoding (..))
import System.Directory (getFileSize)
import System.FilePath ((</>))
import Test.Hspec
import XFTPCLI
import XFTPClient
xftpAgentTests :: Spec
xftpAgentTests = around_ testBracket . describe "Functional API" $ do
it "should receive file" testXFTPAgentReceive
testXFTPAgentReceive :: IO ()
testXFTPAgentReceive = withXFTPServer $ do
-- send file using CLI
let filePath = senderFiles </> "testfile"
xftpCLI ["rand", filePath, "17mb"] `shouldReturn` ["File created: " <> filePath]
file <- LB.readFile filePath
getFileSize filePath `shouldReturn` mb 17
let fdRcv = filePath <> ".xftp" </> "rcv1.xftp"
fdSnd = filePath <> ".xftp" </> "snd.xftp.private"
progress : sendResult <- xftpCLI ["send", filePath, senderFiles, "-s", testXFTPServerStr, "--tmp=tests/tmp"]
progress `shouldSatisfy` uploadProgress
sendResult
`shouldBe` [ "Sender file description: " <> fdSnd,
"Pass file descriptions to the recipient(s):",
fdRcv
]
-- receive file using agent
rcp <- getSMPAgentClient agentCfg initAgentServers
runRight_ $ do
fd :: ValidFileDescription 'FPRecipient <- getFileDescription fdRcv
fId <- xftpReceiveFile rcp 1 fd recipientFiles
("", "", FRCVD fId' path) <- get rcp
liftIO $ do
fId' `shouldBe` fId
LB.readFile path `shouldReturn` file
where
getFileDescription :: FilePath -> ExceptT AgentErrorType IO (ValidFileDescription 'FPRecipient)
getFileDescription path = do
fd :: AFileDescription <- ExceptT $ first (INTERNAL . ("Failed to parse file description: " <>)) . strDecode <$> LB.readFile path
vfd <- liftEither . first INTERNAL $ validateFileDescription fd
case vfd of
AVFD fd' -> either (throwError . INTERNAL) pure $ checkParty fd'
+18 -19
View File
@@ -33,16 +33,19 @@ senderFiles = "tests/tmp/xftp-sender-files"
recipientFiles :: FilePath
recipientFiles = "tests/tmp/xftp-recipient-files"
xftpCLI :: [String] -> IO [String]
xftpCLI params = lines <$> capture_ (withArgs params xftpClientCLI)
testXFTPCLISendReceive :: IO ()
testXFTPCLISendReceive = withXFTPServer $ do
let filePath = senderFiles </> "testfile"
xftp ["rand", filePath, "17mb"] `shouldReturn` ["File created: " <> filePath]
xftpCLI ["rand", filePath, "17mb"] `shouldReturn` ["File created: " <> filePath]
file <- LB.readFile filePath
getFileSize filePath `shouldReturn` mb 17
let fdRcv1 = filePath <> ".xftp" </> "rcv1.xftp"
fdRcv2 = filePath <> ".xftp" </> "rcv2.xftp"
fdSnd = filePath <> ".xftp" </> "snd.xftp.private"
progress : sendResult <- xftp ["send", filePath, senderFiles, "-n", "2", "-s", testXFTPServerStr, "--tmp=tests/tmp"]
progress : sendResult <- xftpCLI ["send", filePath, senderFiles, "-n", "2", "-s", testXFTPServerStr, "--tmp=tests/tmp"]
progress `shouldSatisfy` uploadProgress
sendResult
`shouldBe` [ "Sender file description: " <> fdSnd,
@@ -55,15 +58,14 @@ testXFTPCLISendReceive = withXFTPServer $ do
testInfoFile fdRcv2 "Recipient"
testReceiveFile fdRcv2 "testfile_1" file
testInfoFile fdSnd "Sender"
xftp ["recv", fdSnd, recipientFiles, "--tmp=tests/tmp"]
xftpCLI ["recv", fdSnd, recipientFiles, "--tmp=tests/tmp"]
`shouldThrow` anyException
where
xftp params = lines <$> capture_ (withArgs params xftpClientCLI)
testInfoFile fd party = do
xftp ["info", fd]
xftpCLI ["info", fd]
`shouldReturn` [party <> " file description", "File download size: 18mb", "File server(s):", testXFTPServerStr <> ": 18mb"]
testReceiveFile fd fileName file = do
progress : recvResult <- xftp ["recv", fd, recipientFiles, "--tmp=tests/tmp", "-y"]
progress : recvResult <- xftpCLI ["recv", fd, recipientFiles, "--tmp=tests/tmp", "-y"]
progress `shouldSatisfy` downloadProgress fileName
recvResult `shouldBe` ["File description " <> fd <> " is deleted."]
LB.readFile (recipientFiles </> fileName) `shouldReturn` file
@@ -71,13 +73,13 @@ testXFTPCLISendReceive = withXFTPServer $ do
testXFTPCLISendReceive2servers :: IO ()
testXFTPCLISendReceive2servers = withXFTPServer . withXFTPServer2 $ do
let filePath = senderFiles </> "testfile"
xftp ["rand", filePath, "17mb"] `shouldReturn` ["File created: " <> filePath]
xftpCLI ["rand", filePath, "17mb"] `shouldReturn` ["File created: " <> filePath]
file <- LB.readFile filePath
getFileSize filePath `shouldReturn` mb 17
let fdRcv1 = filePath <> ".xftp" </> "rcv1.xftp"
fdRcv2 = filePath <> ".xftp" </> "rcv2.xftp"
fdSnd = filePath <> ".xftp" </> "snd.xftp.private"
progress : sendResult <- xftp ["send", filePath, senderFiles, "-n", "2", "-s", testXFTPServerStr <> ";" <> testXFTPServerStr2, "--tmp=tests/tmp"]
progress : sendResult <- xftpCLI ["send", filePath, senderFiles, "-n", "2", "-s", testXFTPServerStr <> ";" <> testXFTPServerStr2, "--tmp=tests/tmp"]
progress `shouldSatisfy` uploadProgress
sendResult
`shouldBe` [ "Sender file description: " <> fdSnd,
@@ -88,9 +90,8 @@ testXFTPCLISendReceive2servers = withXFTPServer . withXFTPServer2 $ do
testReceiveFile fdRcv1 "testfile" file
testReceiveFile fdRcv2 "testfile_1" file
where
xftp params = lines <$> capture_ (withArgs params xftpClientCLI)
testReceiveFile fd fileName file = do
partyStr : sizeStr : srvStr : srvs <- xftp ["info", fd]
partyStr : sizeStr : srvStr : srvs <- xftpCLI ["info", fd]
partyStr `shouldContain` "Recipient file description"
sizeStr `shouldBe` "File download size: 18mb"
srvStr `shouldBe` "File server(s):"
@@ -100,7 +101,7 @@ testXFTPCLISendReceive2servers = withXFTPServer . withXFTPServer2 $ do
srv1 `shouldContain` testXFTPServerStr
srv2 `shouldContain` testXFTPServerStr2
_ -> print srvs >> error "more than 2 servers returned"
progress : recvResult <- xftp ["recv", fd, recipientFiles, "--tmp=tests/tmp", "-y"]
progress : recvResult <- xftpCLI ["recv", fd, recipientFiles, "--tmp=tests/tmp", "-y"]
progress `shouldSatisfy` downloadProgress fileName
recvResult `shouldBe` ["File description " <> fd <> " is deleted."]
LB.readFile (recipientFiles </> fileName) `shouldReturn` file
@@ -108,13 +109,13 @@ testXFTPCLISendReceive2servers = withXFTPServer . withXFTPServer2 $ do
testXFTPCLIDelete :: IO ()
testXFTPCLIDelete = withXFTPServer . withXFTPServer2 $ do
let filePath = senderFiles </> "testfile"
xftp ["rand", filePath, "17mb"] `shouldReturn` ["File created: " <> filePath]
xftpCLI ["rand", filePath, "17mb"] `shouldReturn` ["File created: " <> filePath]
file <- LB.readFile filePath
getFileSize filePath `shouldReturn` mb 17
let fdRcv1 = filePath <> ".xftp" </> "rcv1.xftp"
fdRcv2 = filePath <> ".xftp" </> "rcv2.xftp"
fdSnd = filePath <> ".xftp" </> "snd.xftp.private"
progress : sendResult <- xftp ["send", filePath, senderFiles, "-n", "2", "-s", testXFTPServerStr <> ";" <> testXFTPServerStr2, "--tmp=tests/tmp"]
progress : sendResult <- xftpCLI ["send", filePath, senderFiles, "-n", "2", "-s", testXFTPServerStr <> ";" <> testXFTPServerStr2, "--tmp=tests/tmp"]
progress `shouldSatisfy` uploadProgress
sendResult
`shouldBe` [ "Sender file description: " <> fdSnd,
@@ -122,23 +123,21 @@ testXFTPCLIDelete = withXFTPServer . withXFTPServer2 $ do
fdRcv1,
fdRcv2
]
xftp ["del", fdRcv1]
xftpCLI ["del", fdRcv1]
`shouldThrow` anyException
progress1 : recvResult <- xftp ["recv", fdRcv1, recipientFiles, "--tmp=tests/tmp", "-y"]
progress1 : recvResult <- xftpCLI ["recv", fdRcv1, recipientFiles, "--tmp=tests/tmp", "-y"]
progress1 `shouldSatisfy` downloadProgress "testfile"
recvResult `shouldBe` ["File description " <> fdRcv1 <> " is deleted."]
LB.readFile (recipientFiles </> "testfile") `shouldReturn` file
fs1 <- listDirectory xftpServerFiles
fs2 <- listDirectory xftpServerFiles2
length fs1 + length fs2 `shouldBe` 6
xftp ["del", fdSnd, "-y"]
xftpCLI ["del", fdSnd, "-y"]
`shouldReturn` ["File deleted! \r", "File description " <> fdSnd <> " is deleted."]
listDirectory xftpServerFiles >>= (`shouldBe` [])
listDirectory xftpServerFiles2 >>= (`shouldBe` [])
xftp ["recv", fdRcv2, recipientFiles, "--tmp=tests/tmp"]
xftpCLI ["recv", fdRcv2, recipientFiles, "--tmp=tests/tmp"]
`shouldThrow` anyException
where
xftp params = lines <$> capture_ (withArgs params xftpClientCLI)
testPrepareChunkSizes :: IO ()
testPrepareChunkSizes = do
+1 -1
View File
@@ -110,6 +110,6 @@ testXFTPClientConfig = defaultXFTPClientConfig
testXFTPClient :: HasCallStack => (HasCallStack => XFTPClient -> IO a) -> IO a
testXFTPClient client =
getXFTPClient (1, testXFTPServer, Nothing) testXFTPClientConfig (pure ()) >>= \case
getXFTPClient (1, testXFTPServer, Nothing) testXFTPClientConfig (\_ -> pure ()) >>= \case
Right c -> client c
Left e -> error $ show e