Merge pull request #709 from simplex-chat/xftp-send

xftp: recoverable send
This commit is contained in:
spaced4ndy
2023-04-13 20:13:04 +04:00
committed by GitHub
16 changed files with 1440 additions and 386 deletions
+399 -128
View File
@@ -19,7 +19,9 @@ module Simplex.FileTransfer.Agent
deleteRcvFile,
-- Sending files
sendFileExperimental,
_sendFile,
sendFile,
deleteSndFileInternal,
deleteSndFileRemote,
)
where
@@ -33,18 +35,22 @@ import Data.Bifunctor (first)
import Data.ByteString (ByteString)
import qualified Data.ByteString.Base64.URL as U
import qualified Data.ByteString.Char8 as B
import qualified Data.ByteString.Lazy.Char8 as LB
import Data.Composition ((.:))
import Data.Int (Int64)
import Data.List (foldl', isSuffixOf, partition)
import Data.List (foldl', isSuffixOf, partition, sortOn)
import Data.List.NonEmpty (nonEmpty)
import qualified Data.List.NonEmpty as L
import Data.Map (Map)
import qualified Data.Map.Strict as M
import Data.Text (Text)
import Data.Time.Clock (getCurrentTime)
import Data.Time.Format (defaultTimeLocale, formatTime)
import Simplex.FileTransfer.Client.Main (CLIError, SendOptions (..), cliSendFileOpts)
import Simplex.FileTransfer.Client (XFTPChunkSpec (..))
import Simplex.FileTransfer.Client.Main
import Simplex.FileTransfer.Crypto
import Simplex.FileTransfer.Description
import Simplex.FileTransfer.Protocol (FileParty (..), FilePartyI)
import Simplex.FileTransfer.Protocol (FileParty (..), FilePartyI, SFileParty (..))
import Simplex.FileTransfer.Transport (XFTPRcvChunkSpec (..))
import Simplex.FileTransfer.Types
import Simplex.FileTransfer.Util (removePath, uniqueCombine)
@@ -53,10 +59,14 @@ import Simplex.Messaging.Agent.Env.SQLite
import Simplex.Messaging.Agent.Protocol
import Simplex.Messaging.Agent.RetryInterval
import Simplex.Messaging.Agent.Store.SQLite
import qualified Simplex.Messaging.Crypto as C
import qualified Simplex.Messaging.Crypto.Lazy as LC
import Simplex.Messaging.Encoding
import Simplex.Messaging.Encoding.String
import Simplex.Messaging.Protocol (EntityId, XFTPServer, XFTPServerWithAuth)
import Simplex.Messaging.TMap (TMap)
import qualified Simplex.Messaging.TMap as TM
import Simplex.Messaging.Util (liftError, liftIOEither, tshow, whenM)
import Simplex.Messaging.Util (bshow, liftError, liftIOEither, tshow, whenM)
import System.FilePath (takeFileName, (</>))
import UnliftIO
import UnliftIO.Concurrent
@@ -67,29 +77,42 @@ startWorkers :: AgentMonad m => AgentClient -> Maybe FilePath -> m ()
startWorkers c workDir = do
wd <- asks $ xftpWorkDir . xftpAgent
atomically $ writeTVar wd workDir
startFiles
startRcvFiles
startSndFiles
startDelFiles
where
startFiles = do
rcvFilesTTL <- asks (rcvFilesTTL . config)
startRcvFiles = do
rcvFilesTTL <- asks $ rcvFilesTTL . config
pendingRcvServers <- withStore' c (`getPendingRcvFilesServers` rcvFilesTTL)
forM_ pendingRcvServers $ \s -> addXFTPWorker c (Just s)
forM_ pendingRcvServers $ \s -> addXFTPRcvWorker c (Just s)
-- start local worker for files pending decryption,
-- no need to make an extra query for the check
-- as the worker will check the store anyway
addXFTPWorker c Nothing
addXFTPRcvWorker c Nothing
startSndFiles = do
sndFilesTTL <- asks $ sndFilesTTL . config
-- start worker for files pending encryption/creation
addXFTPSndWorker c Nothing
pendingSndServers <- withStore' c (`getPendingSndFilesServers` sndFilesTTL)
forM_ pendingSndServers $ \s -> addXFTPSndWorker c (Just s)
startDelFiles = do
rcvFilesTTL <- asks $ rcvFilesTTL . config
pendingDelServers <- withStore' c (`getPendingDelFilesServers` rcvFilesTTL)
forM_ pendingDelServers $ addXFTPDelWorker c
closeXFTPAgent :: MonadUnliftIO m => XFTPAgent -> m ()
closeXFTPAgent XFTPAgent {xftpWorkers} = do
ws <- atomically $ stateTVar xftpWorkers (,M.empty)
mapM_ (uninterruptibleCancel . snd) ws
closeXFTPAgent XFTPAgent {xftpRcvWorkers, xftpSndWorkers} = do
stopWorkers xftpRcvWorkers
stopWorkers xftpSndWorkers
where
stopWorkers wsSel = do
ws <- atomically $ stateTVar wsSel (,M.empty)
mapM_ (uninterruptibleCancel . snd) ws
receiveFile :: AgentMonad m => AgentClient -> UserId -> ValidFileDescription 'FRecipient -> m RcvFileId
receiveFile c userId (ValidFileDescription fd@FileDescription {chunks}) = do
g <- asks idsDrg
workPath <- getXFTPWorkPath
ts <- liftIO getCurrentTime
let isoTime = formatTime defaultTimeLocale "%Y%m%d_%H%M%S_%6q" ts
prefixPath <- uniqueCombine workPath (isoTime <> "_rcv.xftp")
prefixPath <- getPrefixPath "rcv.xftp"
createDirectory prefixPath
let relPrefixPath = takeFileName prefixPath
relTmpPath = relPrefixPath </> "xftp.encrypted"
@@ -102,9 +125,16 @@ receiveFile c userId (ValidFileDescription fd@FileDescription {chunks}) = do
where
downloadChunk :: AgentMonad m => FileChunk -> m ()
downloadChunk FileChunk {replicas = (FileChunkReplica {server} : _)} = do
addXFTPWorker c (Just server)
addXFTPRcvWorker c (Just server)
downloadChunk _ = throwError $ INTERNAL "no replicas"
getPrefixPath :: AgentMonad m => String -> m FilePath
getPrefixPath suffix = do
workPath <- getXFTPWorkPath
ts <- liftIO getCurrentTime
let isoTime = formatTime defaultTimeLocale "%Y%m%d_%H%M%S_%6q" ts
uniqueCombine workPath (isoTime <> "_" <> suffix)
toFSFilePath :: AgentMonad m => FilePath -> m FilePath
toFSFilePath f = (</> f) <$> getXFTPWorkPath
@@ -113,73 +143,77 @@ createEmptyFile fPath = do
h <- openFile fPath AppendMode
liftIO $ B.hPut h "" >> hFlush h
addXFTPWorker :: AgentMonad m => AgentClient -> Maybe XFTPServer -> m ()
addXFTPWorker c srv_ = do
ws <- asks $ xftpWorkers . xftpAgent
addXFTPRcvWorker :: AgentMonad m => AgentClient -> Maybe XFTPServer -> m ()
addXFTPRcvWorker c = addWorker c xftpRcvWorkers runXFTPRcvWorker runXFTPRcvLocalWorker
addWorker ::
AgentMonad m =>
AgentClient ->
(XFTPAgent -> TMap (Maybe XFTPServer) (TMVar (), Async ())) ->
(AgentClient -> XFTPServer -> TMVar () -> m ()) ->
(AgentClient -> TMVar () -> m ()) ->
Maybe XFTPServer ->
m ()
addWorker c wsSel runWorker runWorkerNoSrv srv_ = do
ws <- asks $ wsSel . xftpAgent
atomically (TM.lookup srv_ ws) >>= \case
Nothing -> do
doWork <- newTMVarIO ()
let runWorker = case srv_ of
Just srv -> runXFTPWorker c srv doWork
Nothing -> runXFTPLocalWorker c doWork
worker <- async $ runWorker `E.finally` atomically (TM.delete srv_ ws)
let runWorker' = case srv_ of
Just srv -> runWorker c srv doWork
Nothing -> runWorkerNoSrv c doWork
worker <- async $ runWorker' `E.finally` atomically (TM.delete srv_ ws)
atomically $ TM.insert srv_ (doWork, worker) ws
Just (doWork, _) ->
void . atomically $ tryPutTMVar doWork ()
runXFTPWorker :: forall m. AgentMonad m => AgentClient -> XFTPServer -> TMVar () -> m ()
runXFTPWorker c srv doWork = do
runXFTPRcvWorker :: forall m. AgentMonad m => AgentClient -> XFTPServer -> TMVar () -> m ()
runXFTPRcvWorker c srv doWork = do
forever $ do
void . atomically $ readTMVar doWork
agentOperationBracket c AORcvNetwork waitUntilActive runXftpOperation
atomically $ checkAgentForeground c
runXFTPOperation
where
noWorkToDo = void . atomically $ tryTakeTMVar doWork
runXftpOperation :: m ()
runXftpOperation = do
runXFTPOperation :: m ()
runXFTPOperation = do
rcvFilesTTL <- asks (rcvFilesTTL . config)
nextChunk <- withStore' c $ \db -> getNextRcvChunkToDownload db srv rcvFilesTTL
case nextChunk of
Nothing -> noWorkToDo
Just RcvFileChunk {rcvFileId, rcvFileEntityId, fileTmpPath, replicas = []} -> workerInternalError c rcvFileId rcvFileEntityId (Just fileTmpPath) "chunk has no replicas"
Just fc@RcvFileChunk {userId, rcvFileId, rcvFileEntityId, fileTmpPath, replicas = replica@RcvFileChunkReplica {rcvChunkReplicaId, delay} : _} -> do
Just RcvFileChunk {rcvFileId, rcvFileEntityId, fileTmpPath, replicas = []} -> rcvWorkerInternalError c rcvFileId rcvFileEntityId (Just fileTmpPath) "chunk has no replicas"
Just fc@RcvFileChunk {userId, rcvFileId, rcvFileEntityId, rcvChunkId, fileTmpPath, replicas = replica@RcvFileChunkReplica {rcvChunkReplicaId, server, delay} : _} -> do
ri <- asks $ reconnectInterval . config
let ri' = maybe ri (\d -> ri {initialInterval = d, increaseAfter = 0}) delay
withRetryInterval ri' $ \delay' loop ->
downloadFileChunk fc replica
`catchError` retryOnError delay' loop (workerInternalError c rcvFileId rcvFileEntityId (Just fileTmpPath) . show)
`catchError` \e -> retryOnError "XFTP rcv worker" (retryLoop loop e delay') (retryDone e) e
where
retryOnError :: Int64 -> m () -> (AgentErrorType -> m ()) -> AgentErrorType -> m ()
retryOnError replicaDelay loop done e = do
logError $ "XFTP worker error: " <> tshow e
if temporaryAgentError e
then retryLoop
else done e
where
retryLoop = do
notifyOnRetry <- asks (xftpNotifyErrsOnRetry . config)
when notifyOnRetry $ notifyInternalError c rcvFileEntityId $ show e
closeXFTPServerClient c userId replica
withStore' c $ \db -> updateRcvChunkReplicaDelay db rcvChunkReplicaId replicaDelay
atomically $ endAgentOperation c AORcvNetwork
atomically $ throwWhenInactive c
atomically $ beginAgentOperation c AORcvNetwork
loop
retryLoop loop e replicaDelay = do
flip catchError (\_ -> pure ()) $ do
notifyOnRetry <- asks (xftpNotifyErrsOnRetry . config)
when notifyOnRetry $ notify c rcvFileEntityId $ RFERR e
closeXFTPServerClient c userId server $ bshow rcvChunkId
withStore' c $ \db -> updateRcvChunkReplicaDelay db rcvChunkReplicaId replicaDelay
atomically $ checkAgentForeground c
loop
retryDone e = rcvWorkerInternalError c rcvFileId rcvFileEntityId (Just fileTmpPath) (show e)
downloadFileChunk :: RcvFileChunk -> RcvFileChunkReplica -> m ()
downloadFileChunk RcvFileChunk {userId, rcvFileId, rcvFileEntityId, rcvChunkId, chunkNo, chunkSize, digest, fileTmpPath} replica = do
fsFileTmpPath <- toFSFilePath fileTmpPath
chunkPath <- uniqueCombine fsFileTmpPath $ show chunkNo
let chunkSpec = XFTPRcvChunkSpec chunkPath (unFileSize chunkSize) (unFileDigest digest)
relChunkPath = fileTmpPath </> takeFileName chunkPath
agentXFTPDownloadChunk c userId replica chunkSpec
agentXFTPDownloadChunk c userId rcvChunkId replica chunkSpec
(complete, progress) <- withStore c $ \db -> runExceptT $ do
RcvFile {size = FileSize total, chunks} <-
ExceptT $ updateRcvFileChunkReceived db (rcvChunkReplicaId replica) rcvChunkId rcvFileId relChunkPath
liftIO $ updateRcvFileChunkReceived db (rcvChunkReplicaId replica) rcvChunkId relChunkPath
RcvFile {size = FileSize total, chunks} <- ExceptT $ getRcvFile db rcvFileId
let rcvd = receivedSize chunks
complete = all chunkReceived chunks
liftIO . when complete $ updateRcvFileStatus db rcvFileId RFSReceived
pure (complete, RFPROG rcvd total)
liftIO $ notify c rcvFileEntityId progress
when complete $ addXFTPWorker c Nothing
notify c rcvFileEntityId progress
when complete $ addXFTPRcvWorker c Nothing
where
receivedSize :: [RcvFileChunk] -> Int64
receivedSize = foldl' (\sz ch -> sz + receivedChunkSize ch) 0
@@ -188,30 +222,34 @@ runXFTPWorker c srv doWork = do
| otherwise = 0
chunkReceived RcvFileChunk {replicas} = any received replicas
workerInternalError :: AgentMonad m => AgentClient -> DBRcvFileId -> RcvFileId -> Maybe FilePath -> String -> m ()
workerInternalError c rcvFileId rcvFileEntityId tmpPath internalErrStr = do
retryOnError :: AgentMonad m => Text -> m a -> m a -> AgentErrorType -> m a
retryOnError name loop done e = do
logError $ name <> " error: " <> tshow e
if temporaryAgentError e
then loop
else done
rcvWorkerInternalError :: AgentMonad m => AgentClient -> DBRcvFileId -> RcvFileId -> Maybe FilePath -> String -> m ()
rcvWorkerInternalError c rcvFileId rcvFileEntityId tmpPath internalErrStr = do
forM_ tmpPath (removePath <=< toFSFilePath)
withStore' c $ \db -> updateRcvFileError db rcvFileId internalErrStr
notifyInternalError c rcvFileEntityId internalErrStr
notify c rcvFileEntityId $ RFERR $ INTERNAL internalErrStr
notifyInternalError :: (MonadUnliftIO m) => AgentClient -> RcvFileId -> String -> m ()
notifyInternalError AgentClient {subQ} rcvFileEntityId internalErrStr = atomically $ writeTBQueue subQ ("", rcvFileEntityId, APC SAERcvFile $ RFERR $ INTERNAL internalErrStr)
runXFTPLocalWorker :: forall m. AgentMonad m => AgentClient -> TMVar () -> m ()
runXFTPLocalWorker c doWork = do
runXFTPRcvLocalWorker :: forall m. AgentMonad m => AgentClient -> TMVar () -> m ()
runXFTPRcvLocalWorker c doWork = do
forever $ do
void . atomically $ readTMVar doWork
-- TODO agentOperationBracket?
runXftpOperation
atomically $ checkAgentForeground c
runXFTPOperation
where
runXftpOperation :: m ()
runXftpOperation = do
runXFTPOperation :: m ()
runXFTPOperation = do
rcvFilesTTL <- asks (rcvFilesTTL . config)
nextFile <- withStore' c (`getNextRcvFileToDecrypt` rcvFilesTTL)
case nextFile of
Nothing -> noWorkToDo
Just f@RcvFile {rcvFileId, rcvFileEntityId, tmpPath} ->
decryptFile f `catchError` (workerInternalError c rcvFileId rcvFileEntityId tmpPath . show)
decryptFile f `catchError` (rcvWorkerInternalError c rcvFileId rcvFileEntityId tmpPath . show)
noWorkToDo = void . atomically $ tryTakeTMVar doWork
decryptFile :: RcvFile -> m ()
decryptFile RcvFile {rcvFileId, rcvFileEntityId, key, nonce, tmpPath, savePath, status, chunks} = do
@@ -222,9 +260,9 @@ runXFTPLocalWorker c doWork = do
chunkPaths <- getChunkPaths chunks
encSize <- liftIO $ foldM (\s path -> (s +) . fromIntegral <$> getFileSize path) 0 chunkPaths
void $ liftError (INTERNAL . show) $ decryptChunks encSize chunkPaths key nonce $ \_ -> pure fsSavePath
notify c rcvFileEntityId $ RFDONE fsSavePath
forM_ tmpPath (removePath <=< toFSFilePath)
withStore' c (`updateRcvFileComplete` rcvFileId)
liftIO $ notify c rcvFileEntityId $ RFDONE fsSavePath
where
getChunkPaths :: [RcvFileChunk] -> m [FilePath]
getChunkPaths [] = pure []
@@ -264,7 +302,7 @@ sendFileExperimental c@AgentClient {xftpServers} userId filePath numRecipients =
createDirectoryIfMissing False tempPath
runSend fileName outputDir tempPath `catchError` \e -> do
cleanup outputDir tempPath
liftIO $ notify c sndFileId $ SFERR e
notify c sndFileId $ SFERR e
where
runSend :: String -> FilePath -> FilePath -> m ()
runSend fileName outputDir tempPath = do
@@ -281,7 +319,7 @@ sendFileExperimental c@AgentClient {xftpServers} userId filePath numRecipients =
liftCLI $ cliSendFileOpts sendOptions False $ notify c sndFileId .: SFPROG
(sndDescr, rcvDescrs) <- readDescrs outputDir fileName
cleanup outputDir tempPath
liftIO $ notify c sndFileId $ SFDONE sndDescr rcvDescrs
notify c sndFileId $ SFDONE sndDescr rcvDescrs
cleanup :: FilePath -> FilePath -> m ()
cleanup outputDir tempPath = do
removePath tempPath
@@ -299,71 +337,304 @@ sendFileExperimental c@AgentClient {xftpServers} userId filePath numRecipients =
readDescr :: FilePartyI p => FilePath -> m (ValidFileDescription p)
readDescr f = liftIOEither $ first INTERNAL . strDecode <$> B.readFile f
notify :: forall e. AEntityI e => AgentClient -> EntityId -> ACommand 'Agent e -> IO ()
notify :: forall m e. (MonadUnliftIO m, AEntityI e) => AgentClient -> EntityId -> ACommand 'Agent e -> m ()
notify c entId cmd = atomically $ writeTBQueue (subQ c) ("", entId, APC (sAEntity @e) cmd)
-- _sendFile :: AgentMonad m => AgentClient -> UserId -> FilePath -> Int -> m SndFileId
_sendFile :: AgentClient -> UserId -> FilePath -> Int -> m SndFileId
_sendFile _c _userId _filePath _numRecipients = do
-- db: create file in status New without chunks
-- add local snd worker for encryption
-- return file id to client
undefined
sendFile :: AgentMonad m => AgentClient -> UserId -> FilePath -> Int -> m SndFileId
sendFile c userId filePath numRecipients = do
g <- asks idsDrg
prefixPath <- getPrefixPath "snd.xftp"
createDirectory prefixPath
let relPrefixPath = takeFileName prefixPath
key <- liftIO C.randomSbKey
nonce <- liftIO C.randomCbNonce
-- saving absolute filePath will not allow to restore file encryption after app update, but it's a short window
fId <- withStore c $ \db -> createSndFile db g userId numRecipients filePath relPrefixPath key nonce
addXFTPSndWorker c Nothing
pure fId
_runXFTPSndLocalWorker :: forall m. AgentMonad m => AgentClient -> TMVar () -> m ()
_runXFTPSndLocalWorker _c doWork = do
addXFTPSndWorker :: AgentMonad m => AgentClient -> Maybe XFTPServer -> m ()
addXFTPSndWorker c = addWorker c xftpSndWorkers runXFTPSndWorker runXFTPSndPrepareWorker
runXFTPSndPrepareWorker :: forall m. AgentMonad m => AgentClient -> TMVar () -> m ()
runXFTPSndPrepareWorker c doWork = do
forever $ do
void . atomically $ readTMVar doWork
runXftpOperation
atomically $ checkAgentForeground c
runXFTPOperation
where
runXftpOperation :: m ()
runXftpOperation = do
-- db: get next snd file to encrypt (in status New)
-- ? (or Encrypted to retry create? - see below)
-- with fixed retries (?) encryptFile
undefined
_encryptFile :: SndFile -> m ()
_encryptFile _sndFile = do
-- if enc path exists, remove it
-- if enc path doesn't exist:
-- - choose enc path
-- - touch file, db: update enc path (?)
-- calculate chunk sizes, encrypt file to enc path
-- calculate digest
-- prepare chunk specs
-- db:
-- - update file status to Encrypted
-- - create chunks according to chunk specs
-- ? since which servers are online is unknown,
-- ? we can't blindly assign servers to replicas.
-- ? should we XFTP create chunks on servers here,
-- ? with retrying for different servers,
-- ? keeping a list of servers that were tried?
-- ? then we can add replicas to chunks in db
-- ? and update file status to Uploading,
-- ? probably in same transaction as creating chunks,
-- ? and add XFTP snd workers for uploading chunks.
undefined
runXFTPOperation :: m ()
runXFTPOperation = do
sndFilesTTL <- asks (sndFilesTTL . config)
nextFile <- withStore' c (`getNextSndFileToPrepare` sndFilesTTL)
case nextFile of
Nothing -> noWorkToDo
Just f@SndFile {sndFileId, sndFileEntityId, prefixPath} ->
prepareFile f `catchError` (sndWorkerInternalError c sndFileId sndFileEntityId prefixPath . show)
noWorkToDo = void . atomically $ tryTakeTMVar doWork
prepareFile :: SndFile -> m ()
prepareFile SndFile {prefixPath = Nothing} =
throwError $ INTERNAL "no prefix path"
prepareFile sndFile@SndFile {sndFileId, userId, prefixPath = Just ppath, status} = do
SndFile {numRecipients, chunks} <-
if status /= SFSEncrypted -- status is SFSNew or SFSEncrypting
then do
fsEncPath <- toFSFilePath $ sndFileEncPath ppath
when (status == SFSEncrypting) $
whenM (doesFileExist fsEncPath) $ removeFile fsEncPath
withStore' c $ \db -> updateSndFileStatus db sndFileId SFSEncrypting
(digest, chunkSpecsDigests) <- encryptFileForUpload sndFile fsEncPath
withStore c $ \db -> do
updateSndFileEncrypted db sndFileId digest chunkSpecsDigests
getSndFile db sndFileId
else pure sndFile
maxRecipients <- asks (xftpMaxRecipientsPerRequest . config)
let numRecipients' = min numRecipients maxRecipients
-- concurrently?
forM_ (filter (not . chunkCreated) chunks) $ createChunk numRecipients'
withStore' c $ \db -> updateSndFileStatus db sndFileId SFSUploading
where
encryptFileForUpload :: SndFile -> FilePath -> m (FileDigest, [(XFTPChunkSpec, FileDigest)])
encryptFileForUpload SndFile {key, nonce, filePath} fsEncPath = do
let fileName = takeFileName filePath
fileSize <- fromInteger <$> getFileSize filePath
when (fileSize > maxFileSize) $ throwError $ INTERNAL "max file size exceeded"
let fileHdr = smpEncode FileHeader {fileName, fileExtra = Nothing}
fileSize' = fromIntegral (B.length fileHdr) + fileSize
chunkSizes = prepareChunkSizes $ fileSize' + fileSizeLen + authTagSize
chunkSizes' = map fromIntegral chunkSizes
encSize = sum chunkSizes'
void $ liftError (INTERNAL . show) $ encryptFile filePath fileHdr key nonce fileSize' encSize fsEncPath
digest <- liftIO $ LC.sha512Hash <$> LB.readFile fsEncPath
let chunkSpecs = prepareChunkSpecs fsEncPath chunkSizes
chunkDigests <- map FileDigest <$> mapM (liftIO . getChunkDigest) chunkSpecs
pure (FileDigest digest, zip chunkSpecs chunkDigests)
chunkCreated :: SndFileChunk -> Bool
chunkCreated SndFileChunk {replicas} =
any (\SndFileChunkReplica {replicaStatus} -> replicaStatus == SFRSCreated) replicas
createChunk :: Int -> SndFileChunk -> m ()
createChunk numRecipients' ch = do
atomically $ checkAgentForeground c
(replica, ProtoServerWithAuth srv _) <- agentOperationBracket c AOSndNetwork throwWhenInactive tryCreate
withStore' c $ \db -> createSndFileReplica db ch replica
addXFTPSndWorker c $ Just srv
where
tryCreate = do
ri <- asks $ messageRetryInterval . config
usedSrvs <- newTVarIO ([] :: [XFTPServer])
withRetryInterval (riFast ri) $ \_ loop ->
createWithNextSrv usedSrvs
`catchError` \e -> retryOnError "XFTP prepare worker" (retryLoop loop) (throwError e) e
where
retryLoop loop = atomically (checkAgentForeground c) >> loop
createWithNextSrv usedSrvs = do
deleted <- withStore' c $ \db -> getSndFileDeleted db sndFileId
when deleted $ throwError $ INTERNAL "file deleted, aborting chunk creation"
withNextSrv c userId usedSrvs [] $ \srvAuth -> do
replica <- agentXFTPNewChunk c ch numRecipients' srvAuth
pure (replica, srvAuth)
_runXFTPSndWorker :: forall m. AgentMonad m => AgentClient -> XFTPServer -> TMVar () -> m ()
_runXFTPSndWorker c _srv doWork = do
sndWorkerInternalError :: AgentMonad m => AgentClient -> DBSndFileId -> SndFileId -> Maybe FilePath -> String -> m ()
sndWorkerInternalError c sndFileId sndFileEntityId prefixPath internalErrStr = do
forM_ prefixPath $ removePath <=< toFSFilePath
withStore' c $ \db -> updateSndFileError db sndFileId internalErrStr
notify c sndFileEntityId $ SFERR $ INTERNAL internalErrStr
runXFTPSndWorker :: forall m. AgentMonad m => AgentClient -> XFTPServer -> TMVar () -> m ()
runXFTPSndWorker c srv doWork = do
forever $ do
void . atomically $ readTMVar doWork
agentOperationBracket c AOSndNetwork throwWhenInactive runXftpOperation
atomically $ checkAgentForeground c
agentOperationBracket c AOSndNetwork throwWhenInactive runXFTPOperation
where
runXftpOperation :: m ()
runXftpOperation = do
-- db: get next snd chunk to upload (replica is not uploaded)
-- with retry interval uploadChunk
-- - with fixed retries, repeat N times:
-- check if other files are in upload, delay (see xftpSndFiles in XFTPAgent)
undefined
_uploadFileChunk :: SndFileChunk -> m ()
_uploadFileChunk _sndFileChunk = do
-- add file id to xftpSndFiles
-- XFTP upload chunk
-- db: update replica status to Uploaded, return SndFile
-- if all SndFile's replicas are uploaded:
-- - serialize file descriptions and notify client
-- - remove file id from xftpSndFiles
undefined
noWorkToDo = void . atomically $ tryTakeTMVar doWork
runXFTPOperation :: m ()
runXFTPOperation = do
sndFilesTTL <- asks (sndFilesTTL . config)
nextChunk <- withStore' c $ \db -> getNextSndChunkToUpload db srv sndFilesTTL
case nextChunk of
Nothing -> noWorkToDo
Just SndFileChunk {sndFileId, sndFileEntityId, filePrefixPath, replicas = []} -> sndWorkerInternalError c sndFileId sndFileEntityId (Just filePrefixPath) "chunk has no replicas"
Just fc@SndFileChunk {userId, sndFileId, sndChunkId, sndFileEntityId, filePrefixPath, replicas = replica@SndFileChunkReplica {sndChunkReplicaId, server, delay} : _} -> do
ri <- asks $ reconnectInterval . config
let ri' = maybe ri (\d -> ri {initialInterval = d, increaseAfter = 0}) delay
withRetryInterval ri' $ \delay' loop ->
uploadFileChunk fc replica
`catchError` \e -> retryOnError "XFTP snd worker" (retryLoop loop e delay') (retryDone e) e
where
retryLoop loop e replicaDelay = do
flip catchError (\_ -> pure ()) $ do
notifyOnRetry <- asks (xftpNotifyErrsOnRetry . config)
when notifyOnRetry $ notify c sndFileEntityId $ SFERR e
closeXFTPServerClient c userId server $ bshow sndChunkId
withStore' c $ \db -> updateSndChunkReplicaDelay db sndChunkReplicaId replicaDelay
atomically $ checkAgentForeground c
loop
retryDone e = sndWorkerInternalError c sndFileId sndFileEntityId (Just filePrefixPath) (show e)
uploadFileChunk :: SndFileChunk -> SndFileChunkReplica -> m ()
uploadFileChunk sndFileChunk@SndFileChunk {sndFileId, sndChunkId, userId, chunkSpec = chunkSpec@XFTPChunkSpec {filePath}} replica = do
replica'@SndFileChunkReplica {sndChunkReplicaId} <- addRecipients sndFileChunk replica
fsFilePath <- toFSFilePath filePath
let chunkSpec' = chunkSpec {filePath = fsFilePath} :: XFTPChunkSpec
atomically $ checkAgentForeground c
agentXFTPUploadChunk c userId sndChunkId replica' chunkSpec'
sf@SndFile {sndFileEntityId, prefixPath, chunks} <- withStore c $ \db -> do
updateSndChunkReplicaStatus db sndChunkReplicaId SFRSUploaded
getSndFile db sndFileId
let uploaded = uploadedSize chunks
total = totalSize chunks
complete = all chunkUploaded chunks
notify c sndFileEntityId $ SFPROG uploaded total
when complete $ do
(sndDescr, rcvDescrs) <- sndFileToDescrs sf
notify c sndFileEntityId $ SFDONE sndDescr rcvDescrs
forM_ prefixPath $ removePath <=< toFSFilePath
withStore' c $ \db -> updateSndFileComplete db sndFileId
where
addRecipients :: SndFileChunk -> SndFileChunkReplica -> m SndFileChunkReplica
addRecipients ch@SndFileChunk {numRecipients} cr@SndFileChunkReplica {rcvIdsKeys}
| length rcvIdsKeys > numRecipients = throwError $ INTERNAL "too many recipients"
| length rcvIdsKeys == numRecipients = pure cr
| otherwise = do
maxRecipients <- asks $ xftpMaxRecipientsPerRequest . config
let numRecipients' = min (numRecipients - length rcvIdsKeys) maxRecipients
rcvIdsKeys' <- agentXFTPAddRecipients c userId sndChunkId cr numRecipients'
cr' <- withStore' c $ \db -> addSndChunkReplicaRecipients db cr $ L.toList rcvIdsKeys'
addRecipients ch cr'
sndFileToDescrs :: SndFile -> m (ValidFileDescription 'FSender, [ValidFileDescription 'FRecipient])
sndFileToDescrs SndFile {digest = Nothing} = throwError $ INTERNAL "snd file has no digest"
sndFileToDescrs SndFile {chunks = []} = throwError $ INTERNAL "snd file has no chunks"
sndFileToDescrs SndFile {digest = Just digest, key, nonce, chunks = chunks@(fstChunk : _)} = do
let chunkSize = FileSize $ sndChunkSize fstChunk
size = FileSize $ sum $ map (fromIntegral . sndChunkSize) chunks
-- snd description
sndDescrChunks <- mapM toSndDescrChunk chunks
let fdSnd = FileDescription {party = SFSender, size, digest, key, nonce, chunkSize, chunks = sndDescrChunks}
validFdSnd <- either (throwError . INTERNAL) pure $ validateFileDescription fdSnd
-- rcv descriptions
let fdRcv = FileDescription {party = SFRecipient, size, digest, key, nonce, chunkSize, chunks = []}
fdRcvs = createRcvFileDescriptions fdRcv chunks
validFdRcvs <- either (throwError . INTERNAL) pure $ mapM validateFileDescription fdRcvs
pure (validFdSnd, validFdRcvs)
toSndDescrChunk :: SndFileChunk -> m FileChunk
toSndDescrChunk SndFileChunk {replicas = []} = throwError $ INTERNAL "snd file chunk has no replicas"
toSndDescrChunk ch@SndFileChunk {chunkNo, digest = chDigest, replicas = (SndFileChunkReplica {server, replicaId, replicaKey} : _)} = do
let chunkSize = FileSize $ sndChunkSize ch
replicas = [FileChunkReplica {server, replicaId, replicaKey}]
pure FileChunk {chunkNo, digest = chDigest, chunkSize, replicas}
createRcvFileDescriptions :: FileDescription 'FRecipient -> [SndFileChunk] -> [FileDescription 'FRecipient]
createRcvFileDescriptions fd sndChunks = map (\chunks -> (fd :: (FileDescription 'FRecipient)) {chunks}) rcvChunks
where
rcvReplicas :: [SentRecipientReplica]
rcvReplicas = concatMap toSentRecipientReplicas sndChunks
toSentRecipientReplicas :: SndFileChunk -> [SentRecipientReplica]
toSentRecipientReplicas ch@SndFileChunk {chunkNo, digest, replicas} =
let chunkSize = FileSize $ sndChunkSize ch
in concatMap
( \SndFileChunkReplica {server, rcvIdsKeys} ->
zipWith
(\rcvNo (replicaId, replicaKey) -> SentRecipientReplica {chunkNo, server, rcvNo, replicaId, replicaKey, digest, chunkSize})
[1 ..]
rcvIdsKeys
)
replicas
rcvChunks :: [[FileChunk]]
rcvChunks = map (sortChunks . M.elems) $ M.elems $ foldl' addRcvChunk M.empty rcvReplicas
sortChunks :: [FileChunk] -> [FileChunk]
sortChunks = map reverseReplicas . sortOn (chunkNo :: FileChunk -> Int)
reverseReplicas ch@FileChunk {replicas} = (ch :: FileChunk) {replicas = reverse replicas}
addRcvChunk :: Map Int (Map Int FileChunk) -> SentRecipientReplica -> Map Int (Map Int FileChunk)
addRcvChunk m SentRecipientReplica {chunkNo, server, rcvNo, replicaId, replicaKey, digest, chunkSize} =
M.alter (Just . addOrChangeRecipient) rcvNo m
where
addOrChangeRecipient :: Maybe (Map Int FileChunk) -> Map Int FileChunk
addOrChangeRecipient = \case
Just m' -> M.alter (Just . addOrChangeChunk) chunkNo m'
_ -> M.singleton chunkNo $ FileChunk {chunkNo, digest, chunkSize, replicas = [replica']}
addOrChangeChunk :: Maybe FileChunk -> FileChunk
addOrChangeChunk = \case
Just ch@FileChunk {replicas} -> ch {replicas = replica' : replicas}
_ -> FileChunk {chunkNo, digest, chunkSize, replicas = [replica']}
replica' = FileChunkReplica {server, replicaId, replicaKey}
uploadedSize :: [SndFileChunk] -> Int64
uploadedSize = foldl' (\sz ch -> sz + uploadedChunkSize ch) 0
uploadedChunkSize ch
| chunkUploaded ch = fromIntegral (sndChunkSize ch)
| otherwise = 0
totalSize :: [SndFileChunk] -> Int64
totalSize = foldl' (\sz ch -> sz + fromIntegral (sndChunkSize ch)) 0
chunkUploaded :: SndFileChunk -> Bool
chunkUploaded SndFileChunk {replicas} =
any (\SndFileChunkReplica {replicaStatus} -> replicaStatus == SFRSUploaded) replicas
deleteSndFileInternal :: AgentMonad m => AgentClient -> UserId -> SndFileId -> m ()
deleteSndFileInternal c userId sndFileEntityId = do
SndFile {sndFileId, prefixPath, status} <- withStore c $ \db -> getSndFileByEntityId db userId sndFileEntityId
if status == SFSComplete || status == SFSError
then do
forM_ prefixPath $ removePath <=< toFSFilePath
withStore' c (`deleteSndFile'` sndFileId)
else withStore' c (`updateSndFileDeleted` sndFileId)
deleteSndFileRemote :: forall m. AgentMonad m => AgentClient -> UserId -> SndFileId -> ValidFileDescription 'FSender -> m ()
deleteSndFileRemote c userId sndFileEntityId (ValidFileDescription FileDescription {chunks}) = do
deleteSndFileInternal c userId sndFileEntityId `catchError` (notify c sndFileEntityId . SFERR)
forM_ chunks $ \ch -> deleteFileChunk ch `catchError` (notify c sndFileEntityId . SFERR)
where
deleteFileChunk :: FileChunk -> m ()
deleteFileChunk FileChunk {replicas = replica@FileChunkReplica {server} : _} = do
withStore' c $ \db -> createDeletedSndChunkReplica db userId replica
addXFTPDelWorker c server
deleteFileChunk _ = pure ()
addXFTPDelWorker :: AgentMonad m => AgentClient -> XFTPServer -> m ()
addXFTPDelWorker c srv = do
ws <- asks $ xftpDelWorkers . xftpAgent
atomically (TM.lookup srv ws) >>= \case
Nothing -> do
doWork <- newTMVarIO ()
worker <- async $ runXFTPDelWorker c srv doWork `E.finally` atomically (TM.delete srv ws)
atomically $ TM.insert srv (doWork, worker) ws
Just (doWork, _) ->
void . atomically $ tryPutTMVar doWork ()
runXFTPDelWorker :: forall m. AgentMonad m => AgentClient -> XFTPServer -> TMVar () -> m ()
runXFTPDelWorker c srv doWork = do
forever $ do
void . atomically $ readTMVar doWork
atomically $ checkAgentForeground c
runXFTPOperation
where
noWorkToDo = void . atomically $ tryTakeTMVar doWork
runXFTPOperation :: m ()
runXFTPOperation = do
-- no point in deleting files older than rcv ttl, as they will be expired on server
rcvFilesTTL <- asks (rcvFilesTTL . config)
nextReplica <- withStore' c $ \db -> getNextDeletedSndChunkReplica db srv rcvFilesTTL
case nextReplica of
Nothing -> noWorkToDo
Just replica@DeletedSndChunkReplica {deletedSndChunkReplicaId, userId, server, replicaId = ChunkReplicaId replId, delay} -> do
ri <- asks $ reconnectInterval . config
let ri' = maybe ri (\d -> ri {initialInterval = d, increaseAfter = 0}) delay
withRetryInterval ri' $ \delay' loop ->
deleteChunkReplica replica
`catchError` \e -> retryOnError "XFTP del worker" (retryLoop loop e delay') (retryDone e) e
where
retryLoop loop e replicaDelay = do
flip catchError (\_ -> pure ()) $ do
notifyOnRetry <- asks (xftpNotifyErrsOnRetry . config)
when notifyOnRetry $ notify c "" $ SFERR e
closeXFTPServerClient c userId server replId
withStore' c $ \db -> updateDeletedSndChunkReplicaDelay db deletedSndChunkReplicaId replicaDelay
atomically $ checkAgentForeground c
loop
retryDone e = delWorkerInternalError c deletedSndChunkReplicaId e
deleteChunkReplica :: DeletedSndChunkReplica -> m ()
deleteChunkReplica replica@DeletedSndChunkReplica {userId, deletedSndChunkReplicaId} = do
agentXFTPDeleteChunk c userId replica
withStore' c $ \db -> deleteDeletedSndChunkReplica db deletedSndChunkReplicaId
delWorkerInternalError :: AgentMonad m => AgentClient -> Int64 -> AgentErrorType -> m ()
delWorkerInternalError c deletedSndChunkReplicaId e = do
withStore' c $ \db -> deleteDeletedSndChunkReplica db deletedSndChunkReplicaId
notify c "" $ SFERR e
-2
View File
@@ -215,7 +215,5 @@ noFile HTTP2Body {bodyPart} a = case bodyPart of
Just _ -> pure a -- throwError $ PCEResponseError HAS_FILE
_ -> pure a
-- FADD :: NonEmpty RcvPublicVerifyKey -> FileCommand Sender
-- FDEL :: FileCommand Sender
-- FACK :: FileCommand Recipient
-- PING :: FileCommand Recipient
+18 -8
View File
@@ -16,6 +16,14 @@ module Simplex.FileTransfer.Client.Main
cliSendFile,
cliSendFileOpts,
prepareChunkSizes,
prepareChunkSpecs,
chunkSize1,
chunkSize2,
chunkSize3,
maxFileSize,
fileSizeLen,
getChunkDigest,
SentRecipientReplica (..),
)
where
@@ -26,6 +34,7 @@ import Control.Monad.Except
import Crypto.Random (getRandomBytes)
import qualified Data.Attoparsec.ByteString.Char8 as A
import Data.Bifunctor (first)
import Data.ByteString.Char8 (ByteString)
import qualified Data.ByteString.Char8 as B
import qualified Data.ByteString.Lazy.Char8 as LB
import Data.Char (toLower)
@@ -55,7 +64,7 @@ import qualified Simplex.Messaging.Crypto.Lazy as LC
import Simplex.Messaging.Encoding
import Simplex.Messaging.Encoding.String (StrEncoding (..))
import Simplex.Messaging.Parsers (parseAll)
import Simplex.Messaging.Protocol (ProtoServerWithAuth (..), SenderId, SndPrivateSignKey, SndPublicVerifyKey, XFTPServer, XFTPServerWithAuth)
import Simplex.Messaging.Protocol (ProtoServerWithAuth (..), SenderId, SndPrivateSignKey, XFTPServer, XFTPServerWithAuth)
import Simplex.Messaging.Server.CLI (getCliCommand')
import Simplex.Messaging.Util (ifM, tshow, whenM)
import System.Exit (exitFailure)
@@ -319,7 +328,8 @@ cliSendFileOpts SendOptions {filePath, outputDir, numRecipients, xftpServers, re
logInfo $ "uploading chunk " <> tshow chunkNo <> " to " <> showServer xftpServer <> "..."
(sndKey, spKey) <- liftIO $ C.generateSignatureKeyPair C.SEd25519
rKeys <- liftIO $ L.fromList <$> replicateM numRecipients (C.generateSignatureKeyPair C.SEd25519)
ch@FileInfo {digest} <- liftIO $ getChunkInfo sndKey chunkSpec
digest <- liftIO $ getChunkDigest chunkSpec
let ch = FileInfo {sndKey, size = fromIntegral chunkSize, digest}
c <- withRetry retryCount $ getXFTPServerClient a xftpServer
(sndId, rIds) <- withRetry retryCount $ createXFTPChunk c spKey ch (L.map fst rKeys) auth
withReconnect a xftpServer retryCount $ \c' -> uploadXFTPChunk c' spKey sndId chunkSpec
@@ -332,12 +342,6 @@ cliSendFileOpts SendOptions {filePath, outputDir, numRecipients, xftpServers, re
let recipients = L.toList $ L.map ChunkReplicaId rIds `L.zip` L.map snd rKeys
replicas = [SentFileChunkReplica {server = xftpServer, recipients}]
pure (chunkNo, SentFileChunk {chunkNo, sndId, sndPrivateKey = spKey, chunkSize = FileSize $ fromIntegral chunkSize, digest = FileDigest digest, replicas})
getChunkInfo :: SndPublicVerifyKey -> XFTPChunkSpec -> IO FileInfo
getChunkInfo sndKey XFTPChunkSpec {filePath = chunkPath, chunkOffset, chunkSize} =
withFile chunkPath ReadMode $ \h -> do
hSeek h AbsoluteSeek $ fromIntegral chunkOffset
digest <- LC.sha256Hash <$> LB.hGet h (fromIntegral chunkSize)
pure FileInfo {sndKey, size = fromIntegral chunkSize, digest}
getXFTPServer :: TVar StdGen -> NonEmpty XFTPServerWithAuth -> IO XFTPServerWithAuth
getXFTPServer gen = \case
srv :| [] -> pure srv
@@ -406,6 +410,12 @@ cliSendFileOpts SendOptions {filePath, outputDir, numRecipients, xftpServers, re
B.writeFile fdSndPath $ strEncode fdSnd
pure (fdRcvPaths, fdSndPath)
getChunkDigest :: XFTPChunkSpec -> IO ByteString
getChunkDigest XFTPChunkSpec {filePath = chunkPath, chunkOffset, chunkSize} =
withFile chunkPath ReadMode $ \h -> do
hSeek h AbsoluteSeek $ fromIntegral chunkOffset
LC.sha256Hash <$> LB.hGet h (fromIntegral chunkSize)
cliReceiveFile :: ReceiveOptions -> ExceptT CLIError IO ()
cliReceiveFile ReceiveOptions {fileDescription, filePath, retryCount, tempPath, verbose, yes} =
getFileDescription' fileDescription >>= receiveFile
+9 -11
View File
@@ -4,7 +4,6 @@
{-# LANGUAGE DuplicateRecordFields #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE KindSignatures #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE PatternSynonyms #-}
@@ -182,7 +181,7 @@ instance FilePartyI p => StrEncoding (ValidFileDescription p) where
instance StrEncoding AValidFileDescription where
strEncode (AVFD fd) = strEncode fd
strDecode = validateFileDescription <=< strDecode
strDecode = (\(AFD fd) -> AVFD <$> validateFileDescription fd) <=< strDecode
strP = strDecode <$?> A.takeByteString
instance FilePartyI p => StrEncoding (FileDescription p) where
@@ -195,15 +194,14 @@ instance StrEncoding AFileDescription where
strDecode = decodeFileDescription <=< first show . Y.decodeEither'
strP = strDecode <$?> A.takeByteString
validateFileDescription :: AFileDescription -> Either String AValidFileDescription
validateFileDescription = \case
AFD fd@FileDescription {size, chunks}
| chunkNos /= [1 .. length chunks] -> Left "chunk numbers are not sequential"
| chunksSize chunks /= unFileSize size -> Left "chunks total size is different than file size"
| otherwise -> Right $ AVFD (ValidFD fd)
where
chunkNos = map (chunkNo :: FileChunk -> Int) chunks
chunksSize = fromIntegral . foldl' (\s FileChunk {chunkSize} -> s + unFileSize chunkSize) 0
validateFileDescription :: FileDescription p -> Either String (ValidFileDescription p)
validateFileDescription fd@FileDescription {size, chunks}
| chunkNos /= [1 .. length chunks] = Left "chunk numbers are not sequential"
| chunksSize chunks /= unFileSize size = Left "chunks total size is different than file size"
| otherwise = Right $ ValidFD fd
where
chunkNos = map (chunkNo :: FileChunk -> Int) chunks
chunksSize = fromIntegral . foldl' (\s FileChunk {chunkSize} -> s + unFileSize chunkSize) 0
encodeFileDescription :: FileDescription p -> YAMLFileDescription
encodeFileDescription FileDescription {party, size, digest, key, nonce, chunkSize, chunks} =
+68 -25
View File
@@ -11,12 +11,13 @@ import Database.SQLite.Simple.FromField (FromField (..))
import Database.SQLite.Simple.ToField (ToField (..))
import Simplex.FileTransfer.Client (XFTPChunkSpec (..))
import Simplex.FileTransfer.Description
import Simplex.Messaging.Agent.Protocol (RcvFileId)
import Simplex.Messaging.Agent.Protocol (RcvFileId, SndFileId)
import qualified Simplex.Messaging.Crypto as C
import Simplex.Messaging.Encoding
import Simplex.Messaging.Encoding.String
import Simplex.Messaging.Parsers (fromTextField_)
import Simplex.Messaging.Protocol
import System.FilePath ((</>))
authTagSize :: Int64
authTagSize = fromIntegral C.authTagSize
@@ -111,26 +112,31 @@ data RcvFileChunkReplica = RcvFileChunkReplica
type DBSndFileId = Int64
data SndFile = SndFile
{ userId :: Int64,
sndFileId :: DBSndFileId,
size :: FileSize Int64,
digest :: FileDigest,
{ sndFileId :: DBSndFileId,
sndFileEntityId :: SndFileId,
userId :: Int64,
numRecipients :: Int,
digest :: Maybe FileDigest,
key :: C.SbKey,
nonce :: C.CbNonce,
chunkSize :: FileSize Word32,
chunks :: [RcvFileChunk],
path :: FilePath,
encPath :: Maybe FilePath,
status :: SndFileStatus
chunks :: [SndFileChunk],
filePath :: FilePath,
prefixPath :: Maybe FilePath,
status :: SndFileStatus,
deleted :: Bool
}
deriving (Eq, Show)
sndFileEncPath :: FilePath -> FilePath
sndFileEncPath prefixPath = prefixPath </> "xftp.encrypted"
data SndFileStatus
= SFSNew
| SFSEncrypting
| SFSEncrypted
| SFSUploading
| SFSComplete
= SFSNew -- db record created
| SFSEncrypting -- encryption started
| SFSEncrypted -- encryption complete
| SFSUploading -- all chunk replicas are created on servers
| SFSComplete -- all chunk replicas are uploaded
| SFSError -- permanent error
deriving (Eq, Show)
instance FromField SndFileStatus where fromField = fromTextField_ textDecode
@@ -144,6 +150,7 @@ instance TextEncoding SndFileStatus where
"encrypted" -> Just SFSEncrypted
"uploading" -> Just SFSUploading
"complete" -> Just SFSComplete
"error" -> Just SFSError
_ -> Nothing
textEncode = \case
SFSNew -> "new"
@@ -151,16 +158,30 @@ instance TextEncoding SndFileStatus where
SFSEncrypted -> "encrypted"
SFSUploading -> "uploading"
SFSComplete -> "complete"
SFSError -> "error"
data SndFileChunk = SndFileChunk
{ userId :: Int64,
sndFileId :: DBSndFileId,
{ sndFileId :: DBSndFileId,
sndFileEntityId :: SndFileId,
userId :: Int64,
numRecipients :: Int,
sndChunkId :: Int64,
chunkNo :: Int,
chunkSpec :: XFTPChunkSpec,
filePrefixPath :: FilePath,
digest :: FileDigest,
replicas :: [SndFileChunkReplica],
delay :: Maybe Int
replicas :: [SndFileChunkReplica]
}
deriving (Eq, Show)
sndChunkSize :: SndFileChunk -> Word32
sndChunkSize SndFileChunk {chunkSpec = XFTPChunkSpec {chunkSize}} = chunkSize
data NewSndChunkReplica = NewSndChunkReplica
{ server :: XFTPServer,
replicaId :: ChunkReplicaId,
replicaKey :: C.APrivateSignKey,
rcvIdsKeys :: [(ChunkReplicaId, C.APrivateSignKey)]
}
deriving (Eq, Show)
@@ -170,15 +191,37 @@ data SndFileChunkReplica = SndFileChunkReplica
replicaId :: ChunkReplicaId,
replicaKey :: C.APrivateSignKey,
rcvIdsKeys :: [(ChunkReplicaId, C.APrivateSignKey)],
-- created :: Bool,
uploaded :: Bool,
replicaStatus :: SndFileReplicaStatus,
delay :: Maybe Int64,
retries :: Int
}
deriving (Eq, Show)
-- to be used in reply to client
data SndFileDescription = SndFileDescription
{ description :: String,
sender :: Bool
data SndFileReplicaStatus
= SFRSCreated
| SFRSUploaded
deriving (Eq, Show)
instance FromField SndFileReplicaStatus where fromField = fromTextField_ textDecode
instance ToField SndFileReplicaStatus where toField = toField . textEncode
instance TextEncoding SndFileReplicaStatus where
textDecode = \case
"created" -> Just SFRSCreated
"uploaded" -> Just SFRSUploaded
_ -> Nothing
textEncode = \case
SFRSCreated -> "created"
SFRSUploaded -> "uploaded"
data DeletedSndChunkReplica = DeletedSndChunkReplica
{ deletedSndChunkReplicaId :: Int64,
userId :: Int64,
server :: XFTPServer,
replicaId :: ChunkReplicaId,
replicaKey :: C.APrivateSignKey,
delay :: Maybe Int64,
retries :: Int
}
deriving (Eq, Show)
+51 -52
View File
@@ -84,7 +84,9 @@ module Simplex.Messaging.Agent
xftpReceiveFile,
xftpDeleteRcvFile,
xftpSendFile,
activateAgent,
xftpDeleteSndFileInternal,
xftpDeleteSndFileRemote,
foregroundAgent,
suspendAgent,
execAgentStoreSQL,
getAgentMigrations,
@@ -107,7 +109,7 @@ import qualified Data.ByteString.Char8 as B
import Data.Composition ((.:), (.:.), (.::))
import Data.Foldable (foldl')
import Data.Functor (($>))
import Data.List (deleteFirstsBy, find, (\\))
import Data.List (find)
import Data.List.NonEmpty (NonEmpty (..), (<|))
import qualified Data.List.NonEmpty as L
import Data.Map.Strict (Map)
@@ -118,7 +120,7 @@ import qualified Data.Text as T
import Data.Time.Clock
import Data.Time.Clock.System (systemToUTCTime)
import qualified Database.SQLite.Simple as DB
import Simplex.FileTransfer.Agent (closeXFTPAgent, deleteRcvFile, receiveFile, sendFileExperimental, startWorkers, toFSFilePath)
import Simplex.FileTransfer.Agent (closeXFTPAgent, deleteRcvFile, deleteSndFileInternal, deleteSndFileRemote, receiveFile, sendFile, startWorkers, toFSFilePath)
import Simplex.FileTransfer.Description (ValidFileDescription)
import Simplex.FileTransfer.Protocol (FileParty (..))
import Simplex.FileTransfer.Util (removePath)
@@ -140,12 +142,11 @@ import Simplex.Messaging.Notifications.Protocol (DeviceToken, NtfRegCode (NtfReg
import Simplex.Messaging.Notifications.Server.Push.APNS (PNMessageData (..))
import Simplex.Messaging.Notifications.Types
import Simplex.Messaging.Parsers (parse)
import Simplex.Messaging.Protocol (BrokerMsg, EntityId, ErrorType (AUTH), MsgBody, MsgFlags, NtfServer, ProtoServerWithAuth, ProtocolTypeI (..), SMPMsgMeta, SProtocolType (..), SndPublicVerifyKey, UserProtocol, XFTPServerWithAuth, protoServer, sameSrvAddr')
import Simplex.Messaging.Protocol (BrokerMsg, EntityId, ErrorType (AUTH), MsgBody, MsgFlags, NtfServer, ProtoServerWithAuth, ProtocolTypeI (..), SMPMsgMeta, SProtocolType (..), SndPublicVerifyKey, UserProtocol, XFTPServerWithAuth)
import qualified Simplex.Messaging.Protocol as SMP
import qualified Simplex.Messaging.TMap as TM
import Simplex.Messaging.Util
import Simplex.Messaging.Version
import System.Random (randomR)
import UnliftIO.Async (async, race_)
import UnliftIO.Concurrent (forkFinally, forkIO, threadDelay)
import qualified UnliftIO.Exception as E
@@ -349,11 +350,19 @@ xftpDeleteRcvFile c = withAgentEnv c .: deleteRcvFile c
-- | Send XFTP file
xftpSendFile :: AgentErrorMonad m => AgentClient -> UserId -> FilePath -> Int -> m SndFileId
xftpSendFile c = withAgentEnv c .:. sendFileExperimental c
xftpSendFile c = withAgentEnv c .:. sendFile c
-- | Delete XFTP snd file internally (deletes work files from file system and db records)
xftpDeleteSndFileInternal :: AgentErrorMonad m => AgentClient -> UserId -> SndFileId -> m ()
xftpDeleteSndFileInternal c = withAgentEnv c .: deleteSndFileInternal c
-- | Delete XFTP snd file chunks on servers
xftpDeleteSndFileRemote :: AgentErrorMonad m => AgentClient -> UserId -> SndFileId -> ValidFileDescription 'FSender -> m ()
xftpDeleteSndFileRemote c = withAgentEnv c .:. deleteSndFileRemote c
-- | Activate operations
activateAgent :: MonadUnliftIO m => AgentClient -> m ()
activateAgent c = withAgentEnv c $ activateAgent' c
foregroundAgent :: MonadUnliftIO m => AgentClient -> m ()
foregroundAgent c = withAgentEnv c $ foregroundAgent' c
-- | Suspend operations with max delay to deliver pending messages
suspendAgent :: MonadUnliftIO m => AgentClient -> Int -> m ()
@@ -551,7 +560,7 @@ joinConn :: AgentMonad m => AgentClient -> UserId -> ConnId -> Bool -> Bool -> C
joinConn c userId connId asyncMode enableNtfs cReq cInfo = do
srv <- case cReq of
CRInvitationUri ConnReqUriData {crSmpQueues = q :| _} _ ->
getNextSMPServer c userId [qServer q]
getNextServer c userId [qServer q]
_ -> getSMPServer c userId
joinConnSrv c userId connId asyncMode enableNtfs cReq cInfo srv
@@ -847,13 +856,13 @@ runCommandProcessing c@AgentClient {subQ} server_ = do
AClientCommand (APC _ cmd) -> case cmd of
NEW enableNtfs (ACM cMode) -> noServer $ do
usedSrvs <- newTVarIO ([] :: [SMPServer])
tryCommand . withNextSrv usedSrvs [] $ \srv -> do
tryCommand . withNextSrv c userId usedSrvs [] $ \srv -> do
(_, cReq) <- newRcvConnSrv c userId connId enableNtfs cMode Nothing srv
notify $ INV (ACR cMode cReq)
JOIN enableNtfs (ACR _ cReq@(CRInvitationUri ConnReqUriData {crSmpQueues = q :| _} _)) connInfo -> noServer $ do
let initUsed = [qServer q]
usedSrvs <- newTVarIO initUsed
tryCommand . withNextSrv usedSrvs initUsed $ \srv -> do
tryCommand . withNextSrv c userId usedSrvs initUsed $ \srv -> do
void $ joinConnSrv c userId connId True enableNtfs cReq connInfo srv
notify OK
LET confId ownCInfo -> withServer' . tryCommand $ allowConnection' c connId confId ownCInfo >> notify OK
@@ -933,16 +942,6 @@ runCommandProcessing c@AgentClient {subQ} server_ = do
cmdError e = notify (ERR e) >> withStore' c (`deleteCommand` cmdId)
notify :: forall e. AEntityI e => ACommand 'Agent e -> m ()
notify cmd = atomically $ writeTBQueue subQ (corrId, connId, APC (sAEntity @e) cmd)
withNextSrv :: TVar [SMPServer] -> [SMPServer] -> (SMPServerWithAuth -> m ()) -> m ()
withNextSrv usedSrvs initUsed action = do
used <- readTVarIO usedSrvs
srvAuth@(ProtoServerWithAuth srv _) <- getNextSMPServer c userId used
atomically $ do
srvs_ <- TM.lookup userId $ smpServers c
let unused = maybe [] ((\\ used) . map protoServer . L.toList) srvs_
used' = if null unused then initUsed else srv : used
writeTVar usedSrvs $! used'
action srvAuth
-- ^ ^ ^ async command processing /
enqueueMessages :: AgentMonad m => AgentClient -> ConnData -> NonEmpty SndQueue -> MsgFlags -> AMessage -> m AgentMsgId
@@ -1023,7 +1022,7 @@ runSmpQueueMsgDelivery c@AgentClient {subQ} cData@ConnData {userId, connId, dupl
atomically $ throwWhenNoDelivery c sq
msgId <- atomically $ readTQueue mq
atomically $ beginAgentOperation c AOSndNetwork
atomically $ endAgentOperation c AOMsgDelivery
atomically $ endAgentOperation c AOMsgDelivery -- this operation begins in queuePendingMsgs
let mId = unId msgId
E.try (withStore c $ \db -> getPendingMsgData db connId msgId) >>= \case
Left (e :: E.SomeException) ->
@@ -1185,8 +1184,8 @@ switchConnection' c connId = withConnLock c connId "switchConnection" $ do
DuplexConnection cData@ConnData {userId} rqs@(rq@RcvQueue {server, dbQueueId, sndId} :| rqs_) sqs -> do
clientVRange <- asks $ smpClientVRange . config
-- try to get the server that is different from all queues, or at least from the primary rcv queue
srvAuth@(ProtoServerWithAuth srv _) <- getNextSMPServer c userId $ map qServer (L.toList rqs) <> map qServer (L.toList sqs)
srv' <- if srv == server then getNextSMPServer c userId [server] else pure srvAuth
srvAuth@(ProtoServerWithAuth srv _) <- getNextServer c userId $ map qServer (L.toList rqs) <> map qServer (L.toList sqs)
srv' <- if srv == server then getNextServer c userId [server] else pure srvAuth
(q, qUri) <- newRcvQueue c userId connId srv' clientVRange
let rq' = (q :: RcvQueue) {primary = True, dbReplaceQueueId = Just dbQueueId}
void . withStore c $ \db -> addConnRcvQueue db connId rq'
@@ -1340,11 +1339,7 @@ connectionStats = \case
-- | Change servers to be used for creating new queues, in Reader monad
setProtocolServers' :: forall p m. (ProtocolTypeI p, UserProtocol p, AgentMonad m) => AgentClient -> UserId -> NonEmpty (ProtoServerWithAuth p) -> m ()
setProtocolServers' c userId srvs = servers >>= atomically . TM.insert userId srvs
where
servers = case protocolTypeI @p of
SPSMP -> pure $ smpServers c
SPXFTP -> pure $ xftpServers c
setProtocolServers' c userId srvs = atomically $ TM.insert userId srvs (userServers c)
registerNtfToken' :: forall m. AgentMonad m => AgentClient -> DeviceToken -> NotificationsMode -> m NtfTknStatus
registerNtfToken' c suppliedDeviceToken suppliedNtfMode =
@@ -1543,9 +1538,9 @@ sendNtfConnCommands c cmd = do
setNtfServers' :: AgentMonad' m => AgentClient -> [NtfServer] -> m ()
setNtfServers' c = atomically . writeTVar (ntfServers c)
activateAgent' :: AgentMonad' m => AgentClient -> m ()
activateAgent' c = do
atomically $ writeTVar (agentState c) ASActive
foregroundAgent' :: AgentMonad' m => AgentClient -> m ()
foregroundAgent' c = do
atomically $ writeTVar (agentState c) ASForeground
mapM_ activate $ reverse agentOperations
where
activate opSel = atomically $ modifyTVar' (opSel c) $ \s -> s {opSuspended = False}
@@ -1590,25 +1585,6 @@ debugAgentLocks' AgentClient {connLocks = cs, reconnectLocks = rs, deleteLock =
getSMPServer :: AgentMonad m => AgentClient -> UserId -> m SMPServerWithAuth
getSMPServer c userId = withUserServers c userId pickServer
pickServer :: AgentMonad' m => NonEmpty SMPServerWithAuth -> m SMPServerWithAuth
pickServer = \case
srv :| [] -> pure srv
servers -> do
gen <- asks randomServer
atomically $ (servers L.!!) <$> stateTVar gen (randomR (0, L.length servers - 1))
getNextSMPServer :: AgentMonad m => AgentClient -> UserId -> [SMPServer] -> m SMPServerWithAuth
getNextSMPServer c userId usedSrvs = withUserServers c userId $ \srvs ->
case L.nonEmpty $ deleteFirstsBy sameSrvAddr' (L.toList srvs) (map noAuthSrv usedSrvs) of
Just srvs' -> pickServer srvs'
_ -> pickServer srvs
withUserServers :: AgentMonad m => AgentClient -> UserId -> (NonEmpty SMPServerWithAuth -> m a) -> m a
withUserServers c userId action =
atomically (TM.lookup userId $ smpServers c) >>= \case
Just srvs -> action srvs
_ -> throwError $ INTERNAL "unknown userId - no SMP servers"
subscriber :: AgentMonad' m => AgentClient -> m ()
subscriber c@AgentClient {msgQ} = forever $ do
t <- atomically $ readTBQueue msgQ
@@ -1628,6 +1604,10 @@ cleanupManager c@AgentClient {subQ} = do
deleteRcvFilesExpired `catchError` (notify "" . RFERR)
deleteRcvFilesDeleted `catchError` (notify "" . RFERR)
deleteRcvFilesTmpPaths `catchError` (notify "" . RFERR)
deleteSndFilesExpired `catchError` (notify "" . SFERR)
deleteSndFilesDeleted `catchError` (notify "" . SFERR)
deleteSndFilesPrefixPaths `catchError` (notify "" . SFERR)
deleteExpiredReplicasForDeletion `catchError` (notify "" . SFERR)
liftIO $ threadDelay' int
where
deleteConns =
@@ -1635,7 +1615,7 @@ cleanupManager c@AgentClient {subQ} = do
void $ withStore' c getDeletedConnIds >>= deleteDeletedConns c
withStore' c deleteUsersWithoutConns >>= mapM_ (notify "" . DEL_USER)
deleteRcvFilesExpired = do
rcvFilesTTL <- asks (rcvFilesTTL . config)
rcvFilesTTL <- asks $ rcvFilesTTL . config
rcvExpired <- withStore' c (`getRcvFilesExpired` rcvFilesTTL)
forM_ rcvExpired $ \(dbId, entId, p) -> flip catchError (notify entId . RFERR) $ do
removePath =<< toFSFilePath p
@@ -1650,6 +1630,25 @@ cleanupManager c@AgentClient {subQ} = do
forM_ rcvTmpPaths $ \(dbId, entId, p) -> flip catchError (notify entId . RFERR) $ do
removePath =<< toFSFilePath p
withStore' c (`updateRcvFileNoTmpPath` dbId)
deleteSndFilesExpired = do
sndFilesTTL <- asks $ sndFilesTTL . config
sndExpired <- withStore' c (`getSndFilesExpired` sndFilesTTL)
forM_ sndExpired $ \(dbId, entId, p) -> flip catchError (notify entId . SFERR) $ do
forM_ p $ removePath <=< toFSFilePath
withStore' c (`deleteSndFile'` dbId)
deleteSndFilesDeleted = do
sndDeleted <- withStore' c getCleanupSndFilesDeleted
forM_ sndDeleted $ \(dbId, entId, p) -> flip catchError (notify entId . SFERR) $ do
forM_ p $ removePath <=< toFSFilePath
withStore' c (`deleteSndFile'` dbId)
deleteSndFilesPrefixPaths = do
sndPrefixPaths <- withStore' c getCleanupSndFilesPrefixPaths
forM_ sndPrefixPaths $ \(dbId, entId, p) -> flip catchError (notify entId . SFERR) $ do
removePath =<< toFSFilePath p
withStore' c (`updateSndFileNoPrefixPath` dbId)
deleteExpiredReplicasForDeletion = do
rcvFilesTTL <- asks $ rcvFilesTTL . config
withStore' c (`deleteDeletedSndChunkReplicasExpired` rcvFilesTTL)
notify :: forall e. AEntityI e => EntityId -> ACommand 'Agent e -> ExceptT AgentErrorType m ()
notify entId cmd = atomically $ writeTBQueue subQ ("", entId, APC (sAEntity @e) cmd)
+111 -15
View File
@@ -53,6 +53,10 @@ module Simplex.Messaging.Agent.Client
agentNtfCheckSubscription,
agentNtfDeleteSubscription,
agentXFTPDownloadChunk,
agentXFTPNewChunk,
agentXFTPUploadChunk,
agentXFTPAddRecipients,
agentXFTPDeleteChunk,
agentCbEncrypt,
agentCbDecrypt,
cryptoError,
@@ -77,6 +81,8 @@ module Simplex.Messaging.Agent.Client
throwWhenNoDelivery,
beginAgentOperation,
endAgentOperation,
waitUntilForeground,
checkAgentForeground,
suspendSendingAndDatabase,
suspendOperation,
notifySuspended,
@@ -84,6 +90,11 @@ module Simplex.Messaging.Agent.Client
withStore,
withStore',
storeError,
userServers,
pickServer,
getNextServer,
withUserServers,
withNextSrv,
)
where
@@ -105,7 +116,8 @@ import Data.ByteString.Char8 (ByteString)
import qualified Data.ByteString.Char8 as B
import Data.Either (lefts, partitionEithers)
import Data.Functor (($>))
import Data.List (foldl', partition)
import Data.Int (Int64)
import Data.List (deleteFirstsBy, foldl', partition, (\\))
import Data.List.NonEmpty (NonEmpty (..), (<|))
import qualified Data.List.NonEmpty as L
import Data.Map.Strict (Map)
@@ -119,12 +131,12 @@ import Data.Word (Word16)
import qualified Database.SQLite.Simple as DB
import GHC.Generics (Generic)
import Network.Socket (HostName)
import Simplex.FileTransfer.Client (XFTPClient, XFTPClientConfig (..), XFTPClientError)
import Simplex.FileTransfer.Client (XFTPChunkSpec (..), XFTPClient, XFTPClientConfig (..), XFTPClientError)
import qualified Simplex.FileTransfer.Client as X
import Simplex.FileTransfer.Description (ChunkReplicaId (..), kb)
import Simplex.FileTransfer.Description (ChunkReplicaId (..), FileDigest (..), kb)
import Simplex.FileTransfer.Protocol (FileInfo (..), FileResponse, XFTPErrorType (DIGEST))
import Simplex.FileTransfer.Transport (XFTPRcvChunkSpec (..))
import Simplex.FileTransfer.Types (RcvFileChunkReplica (..))
import Simplex.FileTransfer.Types (DeletedSndChunkReplica (..), NewSndChunkReplica (..), RcvFileChunkReplica (..), SndFileChunk (..), SndFileChunkReplica (..))
import Simplex.FileTransfer.Util (uniqueCombine)
import Simplex.Messaging.Agent.Env.SQLite
import Simplex.Messaging.Agent.Lock
@@ -156,6 +168,7 @@ import Simplex.Messaging.Protocol
NtfPublicVerifyKey,
NtfServer,
ProtoServer,
ProtoServerWithAuth (..),
Protocol (..),
ProtocolServer (..),
ProtocolTypeI (..),
@@ -164,8 +177,12 @@ import Simplex.Messaging.Protocol
RcvMessage (..),
RcvNtfPublicDhKey,
SMPMsgMeta (..),
SProtocolType (..),
SndPublicVerifyKey,
UserProtocol,
XFTPServer,
XFTPServerWithAuth,
sameSrvAddr',
)
import qualified Simplex.Messaging.Protocol as SMP
import Simplex.Messaging.TMap (TMap)
@@ -173,6 +190,7 @@ import qualified Simplex.Messaging.TMap as TM
import Simplex.Messaging.Transport.Client (TransportHost)
import Simplex.Messaging.Util
import Simplex.Messaging.Version
import System.Random (randomR)
import System.Timeout (timeout)
import UnliftIO (mapConcurrently)
import UnliftIO.Directory (getTemporaryDirectory)
@@ -250,7 +268,7 @@ agentOperations = [ntfNetworkOp, rcvNetworkOp, msgDeliveryOp, sndNetworkOp, data
data AgentOpState = AgentOpState {opSuspended :: Bool, opsInProgress :: Int}
data AgentState = ASActive | ASSuspending | ASSuspended
data AgentState = ASForeground | ASSuspending | ASSuspended
deriving (Eq, Show)
data AgentLocks = AgentLocks {connLocks :: Map String String, srvLocks :: Map String String, delLock :: Maybe String}
@@ -295,7 +313,7 @@ newAgentClient InitialAgentServers {smp, ntf, xftp, netCfg} agentEnv = do
msgDeliveryOp <- newTVar $ AgentOpState False 0
sndNetworkOp <- newTVar $ AgentOpState False 0
databaseOp <- newTVar $ AgentOpState False 0
agentState <- newTVar ASActive
agentState <- newTVar ASForeground
getMsgLocks <- TM.empty
connLocks <- TM.empty
deleteLock <- createLock
@@ -606,9 +624,9 @@ closeClient_ c cVar = do
Just (Right client) -> closeProtocolServerClient client `catchAll_` pure ()
_ -> pure ()
closeXFTPServerClient :: AgentMonad' m => AgentClient -> UserId -> RcvFileChunkReplica -> m ()
closeXFTPServerClient c userId RcvFileChunkReplica {server, replicaId = ChunkReplicaId fId} =
mkTransportSession c userId server fId >>= liftIO . closeClient c xftpClients
closeXFTPServerClient :: AgentMonad' m => AgentClient -> UserId -> XFTPServer -> ByteString -> m ()
closeXFTPServerClient c userId server entityId =
mkTransportSession c userId server entityId >>= liftIO . closeClient c xftpClients
cancelActions :: (Foldable f, Monoid (f (Async ()))) => TVar (f (Async ())) -> IO ()
cancelActions as = atomically (swapTVar as mempty) >>= mapM_ (forkIO . uninterruptibleCancel)
@@ -667,9 +685,9 @@ withXFTPClient ::
ByteString ->
(Client msg -> ExceptT (ProtocolClientError err) IO b) ->
m b
withXFTPClient c (userId, srv, fId) cmdStr action = do
tSess <- mkTransportSession c userId srv fId
withLogClient c tSess (strEncode fId) cmdStr action
withXFTPClient c (userId, srv, entityId) cmdStr action = do
tSess <- mkTransportSession c userId srv entityId
withLogClient c tSess entityId cmdStr action
liftClient :: (AgentMonad m, Show err, Encoding err) => (err -> AgentErrorType) -> HostName -> ExceptT (ProtocolClientError err) IO a -> m a
liftClient protocolError_ = liftError . protocolClientError protocolError_
@@ -1060,9 +1078,44 @@ agentNtfDeleteSubscription :: AgentMonad m => AgentClient -> NtfSubscriptionId -
agentNtfDeleteSubscription c subId NtfToken {ntfServer, ntfPrivKey} =
withNtfClient c ntfServer subId "SDEL" $ \ntf -> ntfDeleteSubscription ntf ntfPrivKey subId
agentXFTPDownloadChunk :: AgentMonad m => AgentClient -> UserId -> RcvFileChunkReplica -> XFTPRcvChunkSpec -> m ()
agentXFTPDownloadChunk c userId RcvFileChunkReplica {server, replicaId = ChunkReplicaId fId, replicaKey} chunkSpec =
withXFTPClient c (userId, server, fId) "FGET" $ \xftp -> X.downloadXFTPChunk xftp replicaKey fId chunkSpec
agentXFTPDownloadChunk :: AgentMonad m => AgentClient -> UserId -> Int64 -> RcvFileChunkReplica -> XFTPRcvChunkSpec -> m ()
agentXFTPDownloadChunk c userId rcvChunkId RcvFileChunkReplica {server, replicaId = ChunkReplicaId fId, replicaKey} chunkSpec =
withXFTPClient c (userId, server, bshow rcvChunkId) "FGET" $ \xftp -> X.downloadXFTPChunk xftp replicaKey fId chunkSpec
agentXFTPNewChunk :: AgentMonad m => AgentClient -> SndFileChunk -> Int -> XFTPServerWithAuth -> m NewSndChunkReplica
agentXFTPNewChunk c SndFileChunk {userId, sndChunkId, chunkSpec = XFTPChunkSpec {chunkSize}, digest = FileDigest digest} n (ProtoServerWithAuth srv auth) = do
rKeys <- xftpRcvKeys n
(sndKey, replicaKey) <- liftIO $ C.generateSignatureKeyPair C.SEd25519
let fileInfo = FileInfo {sndKey, size = fromIntegral chunkSize, digest}
logServer "-->" c srv "" "FNEW"
tSess <- mkTransportSession c userId srv $ bshow sndChunkId
(sndId, rIds) <- withClient c tSess "FNEW" $ \xftp -> X.createXFTPChunk xftp replicaKey fileInfo (L.map fst rKeys) auth
logServer "<--" c srv "" $ B.unwords ["SIDS", logSecret sndId]
pure NewSndChunkReplica {server = srv, replicaId = ChunkReplicaId sndId, replicaKey, rcvIdsKeys = L.toList $ xftpRcvIdsKeys rIds rKeys}
agentXFTPUploadChunk :: AgentMonad m => AgentClient -> UserId -> Int64 -> SndFileChunkReplica -> XFTPChunkSpec -> m ()
agentXFTPUploadChunk c userId sndChunkId SndFileChunkReplica {server, replicaId = ChunkReplicaId fId, replicaKey} chunkSpec =
withXFTPClient c (userId, server, bshow sndChunkId) "FPUT" $ \xftp -> X.uploadXFTPChunk xftp replicaKey fId chunkSpec
agentXFTPAddRecipients :: AgentMonad m => AgentClient -> UserId -> Int64 -> SndFileChunkReplica -> Int -> m (NonEmpty (ChunkReplicaId, C.APrivateSignKey))
agentXFTPAddRecipients c userId sndChunkId SndFileChunkReplica {server, replicaId = ChunkReplicaId fId, replicaKey} n = do
rKeys <- xftpRcvKeys n
rIds <- withXFTPClient c (userId, server, bshow sndChunkId) "FADD" $ \xftp -> X.addXFTPRecipients xftp replicaKey fId (L.map fst rKeys)
pure $ xftpRcvIdsKeys rIds rKeys
agentXFTPDeleteChunk :: AgentMonad m => AgentClient -> UserId -> DeletedSndChunkReplica -> m ()
agentXFTPDeleteChunk c userId DeletedSndChunkReplica {server, replicaId = ChunkReplicaId fId, replicaKey} =
withXFTPClient c (userId, server, fId) "FDEL" $ \xftp -> X.deleteXFTPChunk xftp replicaKey fId
xftpRcvKeys :: AgentMonad m => Int -> m (NonEmpty C.ASignatureKeyPair)
xftpRcvKeys n = do
rKeys <- liftIO $ replicateM n $ C.generateSignatureKeyPair C.SEd25519
case L.nonEmpty rKeys of
Just rKeys' -> pure rKeys'
_ -> throwError $ INTERNAL "non-positive number of recipients"
xftpRcvIdsKeys :: NonEmpty ByteString -> NonEmpty C.ASignatureKeyPair -> NonEmpty (ChunkReplicaId, C.APrivateSignKey)
xftpRcvIdsKeys rIds rKeys = L.map ChunkReplicaId rIds `L.zip` L.map snd rKeys
agentCbEncrypt :: AgentMonad m => SndQueue -> Maybe C.PublicKeyX25519 -> ByteString -> m ByteString
agentCbEncrypt SndQueue {e2eDhSecret, smpClientVersion} e2ePubKey msg = do
@@ -1161,6 +1214,14 @@ agentOperationBracket c op check action =
(\_ -> atomically $ endAgentOperation c op)
(const action)
waitUntilForeground :: AgentClient -> STM ()
waitUntilForeground c = unlessM ((ASForeground ==) <$> readTVar (agentState c)) retry
checkAgentForeground :: AgentClient -> STM ()
checkAgentForeground c = do
throwWhenInactive c
waitUntilForeground c
withStore' :: AgentMonad m => AgentClient -> (DB.Connection -> IO a) -> m a
withStore' c action = withStore c $ fmap Right . action
@@ -1208,3 +1269,38 @@ incClientStatN c userId pc n cmd res = do
atomically $ incStat c n statsKey
where
statsKey = AgentStatsKey {userId, host = strEncode $ clientTransportHost pc, clientTs = strEncode $ clientSessionTs pc, cmd, res}
userServers :: forall p. (ProtocolTypeI p, UserProtocol p) => AgentClient -> TMap UserId (NonEmpty (ProtoServerWithAuth p))
userServers c = case protocolTypeI @p of
SPSMP -> smpServers c
SPXFTP -> xftpServers c
pickServer :: forall p m. (AgentMonad' m) => NonEmpty (ProtoServerWithAuth p) -> m (ProtoServerWithAuth p)
pickServer = \case
srv :| [] -> pure srv
servers -> do
gen <- asks randomServer
atomically $ (servers L.!!) <$> stateTVar gen (randomR (0, L.length servers - 1))
getNextServer :: forall p m. (ProtocolTypeI p, UserProtocol p, AgentMonad m) => AgentClient -> UserId -> [ProtocolServer p] -> m (ProtoServerWithAuth p)
getNextServer c userId usedSrvs = withUserServers c userId $ \srvs ->
case L.nonEmpty $ deleteFirstsBy sameSrvAddr' (L.toList srvs) (map noAuthSrv usedSrvs) of
Just srvs' -> pickServer srvs'
_ -> pickServer srvs
withUserServers :: forall p m a. (ProtocolTypeI p, UserProtocol p, AgentMonad m) => AgentClient -> UserId -> (NonEmpty (ProtoServerWithAuth p) -> m a) -> m a
withUserServers c userId action =
atomically (TM.lookup userId $ userServers c) >>= \case
Just srvs -> action srvs
_ -> throwError $ INTERNAL "unknown userId - no user servers"
withNextSrv :: forall p m a. (ProtocolTypeI p, UserProtocol p, AgentMonad m) => AgentClient -> UserId -> TVar [ProtocolServer p] -> [ProtocolServer p] -> ((ProtoServerWithAuth p) -> m a) -> m a
withNextSrv c userId usedSrvs initUsed action = do
used <- readTVarIO usedSrvs
srvAuth@(ProtoServerWithAuth srv _) <- getNextServer c userId used
atomically $ do
srvs_ <- TM.lookup userId $ userServers c
let unused = maybe [] ((\\ used) . map protoServer . L.toList) srvs_
used' = if null unused then initUsed else srv : used
writeTVar usedSrvs $! used'
action srvAuth
+11 -9
View File
@@ -83,7 +83,9 @@ data AgentConfig = AgentConfig
initialCleanupDelay :: Int64,
cleanupInterval :: Int64,
rcvFilesTTL :: NominalDiffTime,
sndFilesTTL :: NominalDiffTime,
xftpNotifyErrsOnRetry :: Bool,
xftpMaxRecipientsPerRequest :: Int,
deleteErrorCount :: Int,
ntfCron :: Word16,
ntfWorkerDelay :: Int,
@@ -144,7 +146,9 @@ defaultAgentConfig =
initialCleanupDelay = 30 * 1000000, -- 30 seconds
cleanupInterval = 30 * 60 * 1000000, -- 30 minutes
rcvFilesTTL = 2 * nominalDay,
sndFilesTTL = nominalDay,
xftpNotifyErrsOnRetry = True,
xftpMaxRecipientsPerRequest = 200,
deleteErrorCount = 10,
ntfCron = 20, -- minutes
ntfWorkerDelay = 100000, -- microseconds
@@ -205,17 +209,15 @@ newNtfSubSupervisor qSize = do
data XFTPAgent = XFTPAgent
{ -- if set, XFTP file paths will be considered as relative to this directory
xftpWorkDir :: TVar (Maybe FilePath),
xftpWorkers :: TMap (Maybe XFTPServer) (TMVar (), Async ())
-- separate send workers for unhindered concurrency between download and upload,
-- clients can also be separate by passing direction to withXFTPClient, and differentiating by it
-- xftpSndWorkers :: TMap (Maybe XFTPServer) (TMVar (), Async ()),
-- files currently in upload - to throttle upload of other files' chunks,
-- this optimization can be dropped for the MVP
-- xftpSndFiles :: TVar (Set DBSndFileId)
xftpRcvWorkers :: TMap (Maybe XFTPServer) (TMVar (), Async ()),
xftpSndWorkers :: TMap (Maybe XFTPServer) (TMVar (), Async ()),
xftpDelWorkers :: TMap XFTPServer (TMVar (), Async ())
}
newXFTPAgent :: STM XFTPAgent
newXFTPAgent = do
xftpWorkDir <- newTVar Nothing
xftpWorkers <- TM.empty
pure XFTPAgent {xftpWorkDir, xftpWorkers}
xftpRcvWorkers <- TM.empty
xftpSndWorkers <- TM.empty
xftpDelWorkers <- TM.empty
pure XFTPAgent {xftpWorkDir, xftpRcvWorkers, xftpSndWorkers, xftpDelWorkers}
+2 -2
View File
@@ -48,10 +48,10 @@ updateRetryInterval2 RI2State {slowInterval, fastInterval} RetryInterval2 {riSlo
data RetryIntervalMode = RISlow | RIFast
deriving (Eq, Show)
withRetryInterval :: forall m. MonadIO m => RetryInterval -> (Int64 -> m () -> m ()) -> m ()
withRetryInterval :: forall m a. MonadIO m => RetryInterval -> (Int64 -> m a -> m a) -> m a
withRetryInterval ri action = callAction 0 $ initialInterval ri
where
callAction :: Int64 -> Int64 -> m ()
callAction :: Int64 -> Int64 -> m a
callAction elapsed delay = action delay loop
where
loop = do
+2
View File
@@ -539,4 +539,6 @@ data StoreError
SEXFTPServerNotFound
| -- | XFTP File not found.
SEFileNotFound
| -- | XFTP Deleted snd chunk replica not found.
SEDeletedSndChunkReplicaNotFound
deriving (Eq, Show, Exception)
+415 -12
View File
@@ -129,7 +129,10 @@ module Simplex.Messaging.Agent.Store.SQLite
getActiveNtfToken,
getNtfRcvQueue,
setConnectionNtfs,
-- File transfer
-- * File transfer
-- Rcv files
createRcvFile,
getRcvFile,
getRcvFileByEntityId,
@@ -147,6 +150,34 @@ module Simplex.Messaging.Agent.Store.SQLite
getCleanupRcvFilesTmpPaths,
getCleanupRcvFilesDeleted,
getRcvFilesExpired,
-- Snd files
createSndFile,
getSndFile,
getSndFileByEntityId,
getNextSndFileToPrepare,
updateSndFileError,
updateSndFileStatus,
updateSndFileEncrypted,
updateSndFileComplete,
updateSndFileNoPrefixPath,
updateSndFileDeleted,
deleteSndFile',
getSndFileDeleted,
createSndFileReplica,
getNextSndChunkToUpload,
updateSndChunkReplicaDelay,
addSndChunkReplicaRecipients,
updateSndChunkReplicaStatus,
getPendingSndFilesServers,
getCleanupSndFilesPrefixPaths,
getCleanupSndFilesDeleted,
getSndFilesExpired,
createDeletedSndChunkReplica,
getNextDeletedSndChunkReplica,
updateDeletedSndChunkReplicaDelay,
deleteDeletedSndChunkReplica,
getPendingDelFilesServers,
deleteDeletedSndChunkReplicasExpired,
-- * utilities
withConnection,
@@ -191,6 +222,7 @@ import Database.SQLite.Simple.ToField (ToField (..))
import qualified Database.SQLite3 as SQLite3
import GHC.Generics (Generic)
import Network.Socket (ServiceName)
import Simplex.FileTransfer.Client (XFTPChunkSpec (..))
import Simplex.FileTransfer.Description
import Simplex.FileTransfer.Protocol (FileParty (..))
import Simplex.FileTransfer.Types
@@ -1868,9 +1900,9 @@ getRcvFileIdByEntityId_ db userId rcvFileEntityId =
getRcvFile :: DB.Connection -> DBRcvFileId -> IO (Either StoreError RcvFile)
getRcvFile db rcvFileId = runExceptT $ do
fd@RcvFile {rcvFileEntityId, userId, tmpPath} <- ExceptT getFile
f@RcvFile {rcvFileEntityId, userId, tmpPath} <- ExceptT getFile
chunks <- maybe (pure []) (liftIO . getChunks rcvFileEntityId userId) tmpPath
pure (fd {chunks} :: RcvFile)
pure (f {chunks} :: RcvFile)
where
getFile :: IO (Either StoreError RcvFile)
getFile = do
@@ -1931,12 +1963,11 @@ updateRcvChunkReplicaDelay db replicaId delay = do
updatedAt <- getCurrentTime
DB.execute db "UPDATE rcv_file_chunk_replicas SET delay = ?, retries = retries + 1, updated_at = ? WHERE rcv_file_chunk_replica_id = ?" (delay, updatedAt, replicaId)
updateRcvFileChunkReceived :: DB.Connection -> Int64 -> Int64 -> DBRcvFileId -> FilePath -> IO (Either StoreError RcvFile)
updateRcvFileChunkReceived db rId cId fId chunkTmpPath = do
updateRcvFileChunkReceived :: DB.Connection -> Int64 -> Int64 -> FilePath -> IO ()
updateRcvFileChunkReceived db replicaId chunkId chunkTmpPath = do
updatedAt <- getCurrentTime
DB.execute db "UPDATE rcv_file_chunk_replicas SET received = 1, updated_at = ? WHERE rcv_file_chunk_replica_id = ?" (updatedAt, rId)
DB.execute db "UPDATE rcv_file_chunks SET tmp_path = ?, updated_at = ? WHERE rcv_file_chunk_id = ?" (chunkTmpPath, updatedAt, cId)
getRcvFile db fId
DB.execute db "UPDATE rcv_file_chunk_replicas SET received = 1, updated_at = ? WHERE rcv_file_chunk_replica_id = ?" (updatedAt, replicaId)
DB.execute db "UPDATE rcv_file_chunks SET tmp_path = ?, updated_at = ? WHERE rcv_file_chunk_id = ?" (chunkTmpPath, updatedAt, chunkId)
updateRcvFileStatus :: DB.Connection -> DBRcvFileId -> RcvFileStatus -> IO ()
updateRcvFileStatus db rcvFileId status = do
@@ -2025,7 +2056,7 @@ getNextRcvFileToDecrypt db ttl = do
getPendingRcvFilesServers :: DB.Connection -> NominalDiffTime -> IO [XFTPServer]
getPendingRcvFilesServers db ttl = do
cutoffTs <- addUTCTime (- ttl) <$> getCurrentTime
map toServer
map toXFTPServer
<$> DB.query
db
[sql|
@@ -2039,9 +2070,9 @@ getPendingRcvFilesServers db ttl = do
AND f.status = ? AND f.deleted = 0 AND f.created_at >= ?
|]
(RFSReceiving, cutoffTs)
where
toServer :: (NonEmpty TransportHost, ServiceName, C.KeyHash) -> XFTPServer
toServer (host, port, keyHash) = XFTPServer host port keyHash
toXFTPServer :: (NonEmpty TransportHost, ServiceName, C.KeyHash) -> XFTPServer
toXFTPServer (host, port, keyHash) = XFTPServer host port keyHash
getCleanupRcvFilesTmpPaths :: DB.Connection -> IO [(DBRcvFileId, RcvFileId, FilePath)]
getCleanupRcvFilesTmpPaths db =
@@ -2075,3 +2106,375 @@ getRcvFilesExpired db ttl = do
WHERE created_at < ?
|]
(Only cutoffTs)
createSndFile :: DB.Connection -> TVar ChaChaDRG -> UserId -> Int -> FilePath -> FilePath -> C.SbKey -> C.CbNonce -> IO (Either StoreError SndFileId)
createSndFile db gVar userId numRecipients path prefixPath key nonce =
createWithRandomId gVar $ \sndFileEntityId ->
DB.execute
db
"INSERT INTO snd_files (snd_file_entity_id, user_id, num_recipients, key, nonce, path, prefix_path, status) VALUES (?,?,?,?,?,?,?,?)"
(sndFileEntityId, userId, numRecipients, key, nonce, path, prefixPath, SFSNew)
getSndFileByEntityId :: DB.Connection -> UserId -> SndFileId -> IO (Either StoreError SndFile)
getSndFileByEntityId db userId sndFileEntityId = runExceptT $ do
sndFileId <- ExceptT $ getSndFileIdByEntityId_ db userId sndFileEntityId
ExceptT $ getSndFile db sndFileId
getSndFileIdByEntityId_ :: DB.Connection -> UserId -> SndFileId -> IO (Either StoreError DBSndFileId)
getSndFileIdByEntityId_ db userId sndFileEntityId =
firstRow fromOnly SEFileNotFound $
DB.query db "SELECT snd_file_id FROM snd_files WHERE user_id = ? AND snd_file_entity_id = ?" (userId, sndFileEntityId)
getSndFile :: DB.Connection -> DBSndFileId -> IO (Either StoreError SndFile)
getSndFile db sndFileId = runExceptT $ do
f@SndFile {sndFileEntityId, userId, numRecipients, prefixPath} <- ExceptT getFile
chunks <- maybe (pure []) (liftIO . getChunks sndFileEntityId userId numRecipients) prefixPath
pure (f {chunks} :: SndFile)
where
getFile :: IO (Either StoreError SndFile)
getFile = do
firstRow toFile SEFileNotFound $
DB.query
db
[sql|
SELECT snd_file_entity_id, user_id, num_recipients, digest, key, nonce, path, prefix_path, status, deleted
FROM snd_files
WHERE snd_file_id = ?
|]
(Only sndFileId)
where
toFile :: (SndFileId, UserId, Int, Maybe FileDigest, C.SbKey, C.CbNonce, FilePath, Maybe FilePath, SndFileStatus, Bool) -> SndFile
toFile (sndFileEntityId, userId, numRecipients, digest, key, nonce, filePath, prefixPath, status, deleted) =
SndFile {sndFileId, sndFileEntityId, userId, numRecipients, digest, key, nonce, filePath, prefixPath, status, deleted, chunks = []}
getChunks :: SndFileId -> UserId -> Int -> FilePath -> IO [SndFileChunk]
getChunks sndFileEntityId userId numRecipients filePrefixPath = do
chunks <-
map toChunk
<$> DB.query
db
[sql|
SELECT snd_file_chunk_id, chunk_no, chunk_offset, chunk_size, digest
FROM snd_file_chunks
WHERE snd_file_id = ?
|]
(Only sndFileId)
forM chunks $ \chunk@SndFileChunk {sndChunkId} -> do
replicas' <- getChunkReplicas sndChunkId
pure (chunk {replicas = replicas'} :: SndFileChunk)
where
toChunk :: (Int64, Int, Int64, Word32, FileDigest) -> SndFileChunk
toChunk (sndChunkId, chunkNo, chunkOffset, chunkSize, digest) =
let chunkSpec = XFTPChunkSpec {filePath = sndFileEncPath filePrefixPath, chunkOffset, chunkSize}
in SndFileChunk {sndFileId, sndFileEntityId, userId, numRecipients, sndChunkId, chunkNo, chunkSpec, filePrefixPath, digest, replicas = []}
getChunkReplicas :: Int64 -> IO [SndFileChunkReplica]
getChunkReplicas chunkId = do
replicas <-
map toReplica
<$> DB.query
db
[sql|
SELECT
r.snd_file_chunk_replica_id, r.replica_id, r.replica_key, r.replica_status, r.delay, r.retries,
s.xftp_host, s.xftp_port, s.xftp_key_hash
FROM snd_file_chunk_replicas r
JOIN xftp_servers s ON s.xftp_server_id = r.xftp_server_id
WHERE r.snd_file_chunk_id = ?
|]
(Only chunkId)
forM replicas $ \replica@SndFileChunkReplica {sndChunkReplicaId} -> do
rcvIdsKeys <- getChunkReplicaRecipients_ db sndChunkReplicaId
pure (replica :: SndFileChunkReplica) {rcvIdsKeys}
where
toReplica :: (Int64, ChunkReplicaId, C.APrivateSignKey, SndFileReplicaStatus, Maybe Int64, Int, NonEmpty TransportHost, ServiceName, C.KeyHash) -> SndFileChunkReplica
toReplica (sndChunkReplicaId, replicaId, replicaKey, replicaStatus, delay, retries, host, port, keyHash) =
let server = XFTPServer host port keyHash
in SndFileChunkReplica {sndChunkReplicaId, server, replicaId, replicaKey, replicaStatus, delay, retries, rcvIdsKeys = []}
getChunkReplicaRecipients_ :: DB.Connection -> Int64 -> IO [(ChunkReplicaId, C.APrivateSignKey)]
getChunkReplicaRecipients_ db replicaId =
DB.query
db
[sql|
SELECT rcv_replica_id, rcv_replica_key
FROM snd_file_chunk_replica_recipients
WHERE snd_file_chunk_replica_id = ?
|]
(Only replicaId)
getNextSndFileToPrepare :: DB.Connection -> NominalDiffTime -> IO (Maybe SndFile)
getNextSndFileToPrepare db ttl = do
cutoffTs <- addUTCTime (- ttl) <$> getCurrentTime
fileId_ :: Maybe DBSndFileId <-
maybeFirstRow fromOnly $
DB.query
db
[sql|
SELECT snd_file_id
FROM snd_files
WHERE status IN (?,?,?) AND deleted = 0 AND created_at >= ?
ORDER BY created_at ASC LIMIT 1
|]
(SFSNew, SFSEncrypting, SFSEncrypted, cutoffTs)
case fileId_ of
Nothing -> pure Nothing
Just fileId -> eitherToMaybe <$> getSndFile db fileId
updateSndFileError :: DB.Connection -> DBSndFileId -> String -> IO ()
updateSndFileError db sndFileId errStr = do
updatedAt <- getCurrentTime
DB.execute db "UPDATE snd_files SET prefix_path = NULL, error = ?, status = ?, updated_at = ? WHERE snd_file_id = ?" (errStr, SFSError, updatedAt, sndFileId)
updateSndFileStatus :: DB.Connection -> DBSndFileId -> SndFileStatus -> IO ()
updateSndFileStatus db sndFileId status = do
updatedAt <- getCurrentTime
DB.execute db "UPDATE snd_files SET status = ?, updated_at = ? WHERE snd_file_id = ?" (status, updatedAt, sndFileId)
updateSndFileEncrypted :: DB.Connection -> DBSndFileId -> FileDigest -> [(XFTPChunkSpec, FileDigest)] -> IO ()
updateSndFileEncrypted db sndFileId digest chunkSpecsDigests = do
updatedAt <- getCurrentTime
DB.execute db "UPDATE snd_files SET status = ?, digest = ?, updated_at = ? WHERE snd_file_id = ?" (SFSEncrypted, digest, updatedAt, sndFileId)
forM_ (zip [1 ..] chunkSpecsDigests) $ \(chunkNo :: Int, (XFTPChunkSpec {chunkOffset, chunkSize}, chunkDigest)) ->
DB.execute db "INSERT INTO snd_file_chunks (snd_file_id, chunk_no, chunk_offset, chunk_size, digest) VALUES (?,?,?,?,?)" (sndFileId, chunkNo, chunkOffset, chunkSize, chunkDigest)
updateSndFileComplete :: DB.Connection -> DBSndFileId -> IO ()
updateSndFileComplete db sndFileId = do
updatedAt <- getCurrentTime
DB.execute db "UPDATE snd_files SET prefix_path = NULL, status = ?, updated_at = ? WHERE snd_file_id = ?" (SFSComplete, updatedAt, sndFileId)
updateSndFileNoPrefixPath :: DB.Connection -> DBSndFileId -> IO ()
updateSndFileNoPrefixPath db sndFileId = do
updatedAt <- getCurrentTime
DB.execute db "UPDATE snd_files SET prefix_path = NULL, updated_at = ? WHERE snd_file_id = ?" (updatedAt, sndFileId)
updateSndFileDeleted :: DB.Connection -> DBSndFileId -> IO ()
updateSndFileDeleted db sndFileId = do
updatedAt <- getCurrentTime
DB.execute db "UPDATE snd_files SET deleted = 1, updated_at = ? WHERE snd_file_id = ?" (updatedAt, sndFileId)
deleteSndFile' :: DB.Connection -> DBSndFileId -> IO ()
deleteSndFile' db sndFileId =
DB.execute db "DELETE FROM snd_files WHERE snd_file_id = ?" (Only sndFileId)
getSndFileDeleted :: DB.Connection -> DBSndFileId -> IO Bool
getSndFileDeleted db sndFileId =
fromMaybe True
<$> maybeFirstRow fromOnly (DB.query db "SELECT deleted FROM snd_files WHERE snd_file_id = ?" (Only sndFileId))
createSndFileReplica :: DB.Connection -> SndFileChunk -> NewSndChunkReplica -> IO ()
createSndFileReplica db SndFileChunk {sndChunkId} NewSndChunkReplica {server, replicaId, replicaKey, rcvIdsKeys} = do
srvId <- createXFTPServer_ db server
DB.execute
db
[sql|
INSERT INTO snd_file_chunk_replicas
(snd_file_chunk_id, replica_number, xftp_server_id, replica_id, replica_key, replica_status)
VALUES (?,?,?,?,?,?)
|]
(sndChunkId, 1 :: Int, srvId, replicaId, replicaKey, SFRSCreated)
rId <- insertedRowId db
forM_ rcvIdsKeys $ \(rcvId, rcvKey) -> do
DB.execute
db
[sql|
INSERT INTO snd_file_chunk_replica_recipients
(snd_file_chunk_replica_id, rcv_replica_id, rcv_replica_key)
VALUES (?,?,?)
|]
(rId, rcvId, rcvKey)
getNextSndChunkToUpload :: DB.Connection -> XFTPServer -> NominalDiffTime -> IO (Maybe SndFileChunk)
getNextSndChunkToUpload db server@ProtocolServer {host, port, keyHash} ttl = do
cutoffTs <- addUTCTime (- ttl) <$> getCurrentTime
chunk_ <-
maybeFirstRow toChunk $
DB.query
db
[sql|
SELECT
f.snd_file_id, f.snd_file_entity_id, f.user_id, f.num_recipients, f.prefix_path,
c.snd_file_chunk_id, c.chunk_no, c.chunk_offset, c.chunk_size, c.digest,
r.snd_file_chunk_replica_id, r.replica_id, r.replica_key, r.replica_status, r.delay, r.retries
FROM snd_file_chunk_replicas r
JOIN xftp_servers s ON s.xftp_server_id = r.xftp_server_id
JOIN snd_file_chunks c ON c.snd_file_chunk_id = r.snd_file_chunk_id
JOIN snd_files f ON f.snd_file_id = c.snd_file_id
WHERE s.xftp_host = ? AND s.xftp_port = ? AND s.xftp_key_hash = ?
AND r.replica_status = ? AND r.replica_number = 1
AND (f.status = ? OR f.status = ?) AND f.deleted = 0 AND f.created_at >= ?
ORDER BY r.created_at ASC
LIMIT 1
|]
(host, port, keyHash, SFRSCreated, SFSEncrypted, SFSUploading, cutoffTs)
forM chunk_ $ \chunk@SndFileChunk {replicas} -> do
replicas' <- forM replicas $ \replica@SndFileChunkReplica {sndChunkReplicaId} -> do
rcvIdsKeys <- getChunkReplicaRecipients_ db sndChunkReplicaId
pure (replica :: SndFileChunkReplica) {rcvIdsKeys}
pure (chunk {replicas = replicas'} :: SndFileChunk)
where
toChunk :: ((DBSndFileId, SndFileId, UserId, Int, FilePath) :. (Int64, Int, Int64, Word32, FileDigest) :. (Int64, ChunkReplicaId, C.APrivateSignKey, SndFileReplicaStatus, Maybe Int64, Int)) -> SndFileChunk
toChunk ((sndFileId, sndFileEntityId, userId, numRecipients, filePrefixPath) :. (sndChunkId, chunkNo, chunkOffset, chunkSize, digest) :. (sndChunkReplicaId, replicaId, replicaKey, replicaStatus, delay, retries)) =
let chunkSpec = XFTPChunkSpec {filePath = sndFileEncPath filePrefixPath, chunkOffset, chunkSize}
in SndFileChunk
{ sndFileId,
sndFileEntityId,
userId,
numRecipients,
sndChunkId,
chunkNo,
chunkSpec,
digest,
filePrefixPath,
replicas = [SndFileChunkReplica {sndChunkReplicaId, server, replicaId, replicaKey, replicaStatus, delay, retries, rcvIdsKeys = []}]
}
updateSndChunkReplicaDelay :: DB.Connection -> Int64 -> Int64 -> IO ()
updateSndChunkReplicaDelay db replicaId delay = do
updatedAt <- getCurrentTime
DB.execute db "UPDATE snd_file_chunk_replicas SET delay = ?, retries = retries + 1, updated_at = ? WHERE snd_file_chunk_replica_id = ?" (delay, updatedAt, replicaId)
addSndChunkReplicaRecipients :: DB.Connection -> SndFileChunkReplica -> [(ChunkReplicaId, C.APrivateSignKey)] -> IO SndFileChunkReplica
addSndChunkReplicaRecipients db r@SndFileChunkReplica {sndChunkReplicaId} rcvIdsKeys = do
forM_ rcvIdsKeys $ \(rcvId, rcvKey) -> do
DB.execute
db
[sql|
INSERT INTO snd_file_chunk_replica_recipients
(snd_file_chunk_replica_id, rcv_replica_id, rcv_replica_key)
VALUES (?,?,?)
|]
(sndChunkReplicaId, rcvId, rcvKey)
rcvIdsKeys' <- getChunkReplicaRecipients_ db sndChunkReplicaId
pure (r :: SndFileChunkReplica) {rcvIdsKeys = rcvIdsKeys'}
updateSndChunkReplicaStatus :: DB.Connection -> Int64 -> SndFileReplicaStatus -> IO ()
updateSndChunkReplicaStatus db replicaId status = do
updatedAt <- getCurrentTime
DB.execute db "UPDATE snd_file_chunk_replicas SET replica_status = ?, updated_at = ? WHERE snd_file_chunk_replica_id = ?" (status, updatedAt, replicaId)
getPendingSndFilesServers :: DB.Connection -> NominalDiffTime -> IO [XFTPServer]
getPendingSndFilesServers db ttl = do
cutoffTs <- addUTCTime (- ttl) <$> getCurrentTime
map toXFTPServer
<$> DB.query
db
[sql|
SELECT DISTINCT
s.xftp_host, s.xftp_port, s.xftp_key_hash
FROM snd_file_chunk_replicas r
JOIN xftp_servers s ON s.xftp_server_id = r.xftp_server_id
JOIN snd_file_chunks c ON c.snd_file_chunk_id = r.snd_file_chunk_id
JOIN snd_files f ON f.snd_file_id = c.snd_file_id
WHERE r.replica_status = ? AND r.replica_number = 1
AND (f.status = ? OR f.status = ?) AND f.deleted = 0 AND f.created_at >= ?
|]
(SFRSCreated, SFSEncrypted, SFSUploading, cutoffTs)
getCleanupSndFilesPrefixPaths :: DB.Connection -> IO [(DBSndFileId, SndFileId, FilePath)]
getCleanupSndFilesPrefixPaths db =
DB.query
db
[sql|
SELECT snd_file_id, snd_file_entity_id, prefix_path
FROM snd_files
WHERE status IN (?,?) AND prefix_path IS NOT NULL
|]
(SFSComplete, SFSError)
getCleanupSndFilesDeleted :: DB.Connection -> IO [(DBSndFileId, SndFileId, Maybe FilePath)]
getCleanupSndFilesDeleted db =
DB.query_
db
[sql|
SELECT snd_file_id, snd_file_entity_id, prefix_path
FROM snd_files
WHERE deleted = 1
|]
getSndFilesExpired :: DB.Connection -> NominalDiffTime -> IO [(DBSndFileId, SndFileId, Maybe FilePath)]
getSndFilesExpired db ttl = do
cutoffTs <- addUTCTime (- ttl) <$> getCurrentTime
DB.query
db
[sql|
SELECT snd_file_id, snd_file_entity_id, prefix_path
FROM snd_files
WHERE created_at < ?
|]
(Only cutoffTs)
createDeletedSndChunkReplica :: DB.Connection -> UserId -> FileChunkReplica -> IO ()
createDeletedSndChunkReplica db userId FileChunkReplica {server, replicaId, replicaKey} = do
srvId <- createXFTPServer_ db server
DB.execute
db
"INSERT INTO deleted_snd_chunk_replicas (user_id, xftp_server_id, replica_id, replica_key) VALUES (?,?,?,?)"
(userId, srvId, replicaId, replicaKey)
getDeletedSndChunkReplica :: DB.Connection -> DBSndFileId -> IO (Either StoreError DeletedSndChunkReplica)
getDeletedSndChunkReplica db deletedSndChunkReplicaId =
firstRow toReplica SEDeletedSndChunkReplicaNotFound $
DB.query
db
[sql|
SELECT
r.user_id, r.replica_id, r.replica_key, r.delay, r.retries,
s.xftp_host, s.xftp_port, s.xftp_key_hash
FROM deleted_snd_chunk_replicas r
JOIN xftp_servers s ON s.xftp_server_id = r.xftp_server_id
WHERE r.deleted_snd_chunk_replica_id = ?
|]
(Only deletedSndChunkReplicaId)
where
toReplica :: (UserId, ChunkReplicaId, C.APrivateSignKey, Maybe Int64, Int, NonEmpty TransportHost, ServiceName, C.KeyHash) -> DeletedSndChunkReplica
toReplica (userId, replicaId, replicaKey, delay, retries, host, port, keyHash) =
let server = XFTPServer host port keyHash
in DeletedSndChunkReplica {deletedSndChunkReplicaId, userId, server, replicaId, replicaKey, delay, retries}
getNextDeletedSndChunkReplica :: DB.Connection -> XFTPServer -> NominalDiffTime -> IO (Maybe DeletedSndChunkReplica)
getNextDeletedSndChunkReplica db ProtocolServer {host, port, keyHash} ttl = do
cutoffTs <- addUTCTime (- ttl) <$> getCurrentTime
replicaId_ :: Maybe Int64 <-
maybeFirstRow fromOnly $
DB.query
db
[sql|
SELECT r.deleted_snd_chunk_replica_id
FROM deleted_snd_chunk_replicas r
JOIN xftp_servers s ON s.xftp_server_id = r.xftp_server_id
WHERE s.xftp_host = ? AND s.xftp_port = ? AND s.xftp_key_hash = ?
AND r.created_at >= ?
ORDER BY r.created_at ASC LIMIT 1
|]
(host, port, keyHash, cutoffTs)
case replicaId_ of
Nothing -> pure Nothing
Just replicaId -> eitherToMaybe <$> getDeletedSndChunkReplica db replicaId
updateDeletedSndChunkReplicaDelay :: DB.Connection -> Int64 -> Int64 -> IO ()
updateDeletedSndChunkReplicaDelay db deletedSndChunkReplicaId delay = do
updatedAt <- getCurrentTime
DB.execute db "UPDATE deleted_snd_chunk_replicas SET delay = ?, retries = retries + 1, updated_at = ? WHERE deleted_snd_chunk_replica_id = ?" (delay, updatedAt, deletedSndChunkReplicaId)
deleteDeletedSndChunkReplica :: DB.Connection -> Int64 -> IO ()
deleteDeletedSndChunkReplica db deletedSndChunkReplicaId =
DB.execute db "DELETE FROM deleted_snd_chunk_replicas WHERE deleted_snd_chunk_replica_id = ?" (Only deletedSndChunkReplicaId)
getPendingDelFilesServers :: DB.Connection -> NominalDiffTime -> IO [XFTPServer]
getPendingDelFilesServers db ttl = do
cutoffTs <- addUTCTime (- ttl) <$> getCurrentTime
map toXFTPServer
<$> DB.query
db
[sql|
SELECT DISTINCT
s.xftp_host, s.xftp_port, s.xftp_key_hash
FROM deleted_snd_chunk_replicas r
JOIN xftp_servers s ON s.xftp_server_id = r.xftp_server_id
WHERE r.created_at >= ?
|]
(Only cutoffTs)
deleteDeletedSndChunkReplicasExpired :: DB.Connection -> NominalDiffTime -> IO ()
deleteDeletedSndChunkReplicasExpired db ttl = do
cutoffTs <- addUTCTime (- ttl) <$> getCurrentTime
DB.execute db "DELETE FROM deleted_snd_chunk_replicas WHERE created_at < ?" (Only cutoffTs)
@@ -57,6 +57,7 @@ import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230120_delete_errors
import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230217_server_key_hash
import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230223_files
import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230320_retry_state
import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230401_snd_files
import Simplex.Messaging.Encoding.String
import Simplex.Messaging.Parsers (dropPrefix, sumTypeJSON)
import Simplex.Messaging.Transport.Client (TransportHost)
@@ -80,7 +81,8 @@ schemaMigrations =
("m20230120_delete_errors", m20230120_delete_errors, Nothing),
("m20230217_server_key_hash", m20230217_server_key_hash, Nothing),
("m20230223_files", m20230223_files, Just down_m20230223_files),
("m20230320_retry_state", m20230320_retry_state, Just down_m20230320_retry_state)
("m20230320_retry_state", m20230320_retry_state, Just down_m20230320_retry_state),
("m20230401_snd_files", m20230401_snd_files, Just down_m20230401_snd_files)
]
-- | The list of migrations in ascending order by date
@@ -5,21 +5,22 @@ module Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230401_snd_files where
import Database.SQLite.Simple (Query)
import Database.SQLite.Simple.QQ (sql)
-- this migration is a draft - it is not included in the list of migrations
m20230401_snd_files :: Query
m20230401_snd_files =
[sql|
CREATE TABLE snd_files (
snd_file_id INTEGER PRIMARY KEY AUTOINCREMENT,
snd_file_id INTEGER PRIMARY KEY,
snd_file_entity_id BLOB NOT NULL,
user_id INTEGER NOT NULL REFERENCES users ON DELETE CASCADE,
size INTEGER NOT NULL,
digest BLOB NOT NULL,
key BLOB NOT NULL,
nonce BLOB NOT NULL,
chunk_size INTEGER NOT NULL,
num_recipients INTEGER NOT NULL,
digest BLOB,
key BLOB NOT NUll,
nonce BLOB NOT NUll,
path TEXT NOT NULL,
enc_path TEXT,
prefix_path TEXT,
status TEXT NOT NULL,
deleted INTEGER NOT NULL DEFAULT 0,
error TEXT,
created_at TEXT NOT NULL DEFAULT (datetime('now')),
updated_at TEXT NOT NULL DEFAULT (datetime('now'))
);
@@ -32,18 +33,13 @@ CREATE TABLE snd_file_chunks (
chunk_no INTEGER NOT NULL,
chunk_offset INTEGER NOT NULL,
chunk_size INTEGER NOT NULL,
digest BLOB NOT NULL,
delay INTEGER,
digest BLOB,
created_at TEXT NOT NULL DEFAULT (datetime('now')),
updated_at TEXT NOT NULL DEFAULT (datetime('now'))
);
CREATE INDEX idx_snd_file_chunks_snd_file_id ON snd_file_chunks(snd_file_id);
-- ? add fk to snd_file_descriptions?
-- ? probably it's not necessary since these entities are
-- ? required at different stages of sending files -
-- ? replicas on upload, description on notifying client
CREATE TABLE snd_file_chunk_replicas (
snd_file_chunk_replica_id INTEGER PRIMARY KEY,
snd_file_chunk_id INTEGER NOT NULL REFERENCES snd_file_chunks ON DELETE CASCADE,
@@ -51,8 +47,8 @@ CREATE TABLE snd_file_chunk_replicas (
xftp_server_id INTEGER NOT NULL REFERENCES xftp_servers ON DELETE CASCADE,
replica_id BLOB NOT NULL,
replica_key BLOB NOT NULL,
-- created INTEGER NOT NULL DEFAULT 0, -- as in XFTP create - registered on server
uploaded INTEGER NOT NULL DEFAULT 0,
replica_status TEXT NOT NULL,
delay INTEGER,
retries INTEGER NOT NULL DEFAULT 0,
created_at TEXT NOT NULL DEFAULT (datetime('now')),
updated_at TEXT NOT NULL DEFAULT (datetime('now'))
@@ -72,14 +68,39 @@ CREATE TABLE snd_file_chunk_replica_recipients (
CREATE INDEX idx_snd_file_chunk_replica_recipients_snd_file_chunk_replica_id ON snd_file_chunk_replica_recipients(snd_file_chunk_replica_id);
CREATE TABLE snd_file_descriptions (
snd_file_description_id INTEGER PRIMARY KEY,
snd_file_id INTEGER NOT NULL REFERENCES snd_files ON DELETE CASCADE,
sender INTEGER NOT NULL, -- 1 for sender file description
descr_text TEXT NOT NULL,
CREATE TABLE deleted_snd_chunk_replicas (
deleted_snd_chunk_replica_id INTEGER PRIMARY KEY,
user_id INTEGER NOT NULL REFERENCES users ON DELETE CASCADE,
xftp_server_id INTEGER NOT NULL REFERENCES xftp_servers ON DELETE CASCADE,
replica_id BLOB NOT NULL,
replica_key BLOB NOT NULL,
delay INTEGER,
retries INTEGER NOT NULL DEFAULT 0,
created_at TEXT NOT NULL DEFAULT (datetime('now')),
updated_at TEXT NOT NULL DEFAULT (datetime('now'))
);
CREATE INDEX idx_snd_file_descriptions_snd_file_id ON snd_file_descriptions(snd_file_id);
CREATE INDEX idx_deleted_snd_chunk_replicas_user_id ON deleted_snd_chunk_replicas(user_id);
CREATE INDEX idx_deleted_snd_chunk_replicas_xftp_server_id ON deleted_snd_chunk_replicas(xftp_server_id);
|]
down_m20230401_snd_files :: Query
down_m20230401_snd_files =
[sql|
DROP INDEX idx_deleted_snd_chunk_replicas_xftp_server_id;
DROP INDEX idx_deleted_snd_chunk_replicas_user_id;
DROP TABLE deleted_snd_chunk_replicas;
DROP INDEX idx_snd_file_chunk_replica_recipients_snd_file_chunk_replica_id;
DROP TABLE snd_file_chunk_replica_recipients;
DROP INDEX idx_snd_file_chunk_replicas_snd_file_chunk_id;
DROP INDEX idx_snd_file_chunk_replicas_xftp_server_id;
DROP TABLE snd_file_chunk_replicas;
DROP INDEX idx_snd_file_chunks_snd_file_id;
DROP TABLE snd_file_chunks;
DROP INDEX idx_snd_files_user_id;
DROP TABLE snd_files;
|]
@@ -345,3 +345,78 @@ CREATE INDEX idx_rcv_file_chunk_replicas_rcv_file_chunk_id ON rcv_file_chunk_rep
CREATE INDEX idx_rcv_file_chunk_replicas_xftp_server_id ON rcv_file_chunk_replicas(
xftp_server_id
);
CREATE TABLE snd_files(
snd_file_id INTEGER PRIMARY KEY,
snd_file_entity_id BLOB NOT NULL,
user_id INTEGER NOT NULL REFERENCES users ON DELETE CASCADE,
num_recipients INTEGER NOT NULL,
digest BLOB,
key BLOB NOT NUll,
nonce BLOB NOT NUll,
path TEXT NOT NULL,
prefix_path TEXT,
status TEXT NOT NULL,
deleted INTEGER NOT NULL DEFAULT 0,
error TEXT,
created_at TEXT NOT NULL DEFAULT(datetime('now')),
updated_at TEXT NOT NULL DEFAULT(datetime('now'))
);
CREATE INDEX idx_snd_files_user_id ON snd_files(user_id);
CREATE TABLE snd_file_chunks(
snd_file_chunk_id INTEGER PRIMARY KEY,
snd_file_id INTEGER NOT NULL REFERENCES snd_files ON DELETE CASCADE,
chunk_no INTEGER NOT NULL,
chunk_offset INTEGER NOT NULL,
chunk_size INTEGER NOT NULL,
digest BLOB,
created_at TEXT NOT NULL DEFAULT(datetime('now')),
updated_at TEXT NOT NULL DEFAULT(datetime('now'))
);
CREATE INDEX idx_snd_file_chunks_snd_file_id ON snd_file_chunks(snd_file_id);
CREATE TABLE snd_file_chunk_replicas(
snd_file_chunk_replica_id INTEGER PRIMARY KEY,
snd_file_chunk_id INTEGER NOT NULL REFERENCES snd_file_chunks ON DELETE CASCADE,
replica_number INTEGER NOT NULL,
xftp_server_id INTEGER NOT NULL REFERENCES xftp_servers ON DELETE CASCADE,
replica_id BLOB NOT NULL,
replica_key BLOB NOT NULL,
replica_status TEXT NOT NULL,
delay INTEGER,
retries INTEGER NOT NULL DEFAULT 0,
created_at TEXT NOT NULL DEFAULT(datetime('now')),
updated_at TEXT NOT NULL DEFAULT(datetime('now'))
);
CREATE INDEX idx_snd_file_chunk_replicas_snd_file_chunk_id ON snd_file_chunk_replicas(
snd_file_chunk_id
);
CREATE INDEX idx_snd_file_chunk_replicas_xftp_server_id ON snd_file_chunk_replicas(
xftp_server_id
);
CREATE TABLE snd_file_chunk_replica_recipients(
snd_file_chunk_replica_recipient_id INTEGER PRIMARY KEY,
snd_file_chunk_replica_id INTEGER NOT NULL REFERENCES snd_file_chunk_replicas ON DELETE CASCADE,
rcv_replica_id BLOB NOT NULL,
rcv_replica_key BLOB NOT NULL,
created_at TEXT NOT NULL DEFAULT(datetime('now')),
updated_at TEXT NOT NULL DEFAULT(datetime('now'))
);
CREATE INDEX idx_snd_file_chunk_replica_recipients_snd_file_chunk_replica_id ON snd_file_chunk_replica_recipients(
snd_file_chunk_replica_id
);
CREATE TABLE deleted_snd_chunk_replicas(
deleted_snd_chunk_replica_id INTEGER PRIMARY KEY,
user_id INTEGER NOT NULL REFERENCES users ON DELETE CASCADE,
xftp_server_id INTEGER NOT NULL REFERENCES xftp_servers ON DELETE CASCADE,
replica_id BLOB NOT NULL,
replica_key BLOB NOT NULL,
delay INTEGER,
retries INTEGER NOT NULL DEFAULT 0,
created_at TEXT NOT NULL DEFAULT(datetime('now')),
updated_at TEXT NOT NULL DEFAULT(datetime('now'))
);
CREATE INDEX idx_deleted_snd_chunk_replicas_user_id ON deleted_snd_chunk_replicas(
user_id
);
CREATE INDEX idx_deleted_snd_chunk_replicas_xftp_server_id ON deleted_snd_chunk_replicas(
xftp_server_id
);
+1 -1
View File
@@ -556,7 +556,7 @@ testSuspendingAgent = do
5 <- sendMessage a bId SMP.noMsgFlags "hello 2"
get a ##> ("", bId, SENT 5)
Nothing <- 100000 `timeout` get b
activateAgent b
foregroundAgent b
get b =##> \case ("", c, Msg "hello 2") -> c == aId; _ -> False
testSuspendingAgentCompleteSending :: ATransport -> IO ()
+232 -98
View File
@@ -7,21 +7,24 @@
module XFTPAgent where
import AgentTests.FunctionalAPITests (get, getSMPAgentClient', rfGet, runRight, runRight_, sfGet)
import Control.Concurrent (threadDelay)
import Control.Logger.Simple
import Control.Monad.Except
import Data.Bifunctor (first)
import qualified Data.ByteString.Char8 as B
import Data.Int (Int64)
import Data.List (find, isSuffixOf)
import Data.Maybe (fromJust)
import SMPAgentClient (agentCfg, initAgentServers, testDB)
import Simplex.FileTransfer.Description
import Simplex.FileTransfer.Protocol (FileParty (..), XFTPErrorType (AUTH))
import Simplex.FileTransfer.Server.Env (XFTPServerConfig (..))
import Simplex.Messaging.Agent (AgentClient, disconnectAgentClient, testProtocolServer, xftpDeleteRcvFile, xftpReceiveFile, xftpSendFile, xftpStartWorkers)
import Simplex.Messaging.Agent (AgentClient, disconnectAgentClient, testProtocolServer, xftpDeleteRcvFile, xftpDeleteSndFileInternal, xftpDeleteSndFileRemote, xftpReceiveFile, xftpSendFile, xftpStartWorkers)
import Simplex.Messaging.Agent.Client (ProtocolTestFailure (..), ProtocolTestStep (..))
import Simplex.Messaging.Agent.Protocol (ACommand (..), AgentErrorType (..), BrokerErrorType (..), noAuthSrv)
import Simplex.Messaging.Agent.Protocol (ACommand (..), AgentErrorType (..), BrokerErrorType (..), RcvFileId, SndFileId, noAuthSrv)
import Simplex.Messaging.Encoding.String (StrEncoding (..))
import Simplex.Messaging.Protocol (BasicAuth, ProtoServerWithAuth (..), ProtocolServer (..), XFTPServerWithAuth)
import System.Directory (doesDirectoryExist, getFileSize, listDirectory)
import System.Directory (doesDirectoryExist, doesFileExist, getFileSize, listDirectory)
import System.FilePath ((</>))
import System.Timeout (timeout)
import Test.Hspec
@@ -30,10 +33,13 @@ import XFTPClient
xftpAgentTests :: Spec
xftpAgentTests = around_ testBracket . describe "Functional API" $ do
it "should receive file" testXFTPAgentReceive
it "should send and receive file" testXFTPAgentSendReceive
it "should resume receiving file after restart" testXFTPAgentReceiveRestore
it "should cleanup tmp path after permanent error" testXFTPAgentReceiveCleanup
it "should send file using experimental api" testXFTPAgentSendExperimental
it "should cleanup rcv tmp path after permanent error" testXFTPAgentReceiveCleanup
it "should resume sending file after restart" testXFTPAgentSendRestore
it "should cleanup snd prefix path after permanent error" testXFTPAgentSendCleanup
it "should delete sent file on server" testXFTPAgentDelete
it "should request additional recipient IDs when number of recipients exceeds maximum per request" testXFTPAgentRequestAdditionalRecipientIDs
describe "XFTP server test via agent API" $ do
it "should pass without basic auth" $ testXFTPServerTest Nothing (noAuthSrv testXFTPServer2) `shouldReturn` Nothing
let srv1 = testXFTPServer2 {keyHash = "1234"}
@@ -70,36 +76,54 @@ checkProgress (prev, expected) (progress, total) loop
| progress < total = loop progress
| otherwise = pure ()
testXFTPAgentReceive :: IO ()
testXFTPAgentReceive = withXFTPServer $ do
-- send file using CLI
testXFTPAgentSendReceive :: IO ()
testXFTPAgentSendReceive = withXFTPServer $ do
filePath <- createRandomFile
-- send file, delete snd file internally
sndr <- getSMPAgentClient' agentCfg initAgentServers testDB
(rfd1, rfd2) <- runRight $ do
(sfId, _, rfd1, rfd2) <- testSend sndr filePath
xftpDeleteSndFileInternal sndr 1 sfId
pure (rfd1, rfd2)
-- receive file, delete rcv file
testReceiveDelete rfd1 filePath
testReceiveDelete rfd2 filePath
where
testReceiveDelete rfd originalFilePath = do
rcp <- getSMPAgentClient' agentCfg initAgentServers testDB
runRight_ $ do
rfId <- testReceive rcp rfd originalFilePath
xftpDeleteRcvFile rcp 1 rfId
createRandomFile :: IO FilePath
createRandomFile = do
let filePath = senderFiles </> "testfile"
xftpCLI ["rand", filePath, "17mb"] `shouldReturn` ["File created: " <> filePath]
file <- B.readFile filePath
getFileSize filePath `shouldReturn` mb 17
let fdRcv = filePath <> ".xftp" </> "rcv1.xftp"
fdSnd = filePath <> ".xftp" </> "snd.xftp.private"
progress : sendResult <- xftpCLI ["send", filePath, senderFiles, "-s", testXFTPServerStr, "--tmp=tests/tmp"]
progress `shouldSatisfy` uploadProgress
sendResult
`shouldBe` [ "Sender file description: " <> fdSnd,
"Pass file descriptions to the recipient(s):",
fdRcv
]
-- receive file using agent
rcp <- getSMPAgentClient' agentCfg initAgentServers testDB
runRight_ $ do
xftpStartWorkers rcp (Just recipientFiles)
fd :: ValidFileDescription 'FRecipient <- getFileDescription fdRcv
fId <- xftpReceiveFile rcp 1 fd
rfProgress rcp $ mb 18
("", fId', RFDONE path) <- rfGet rcp
liftIO $ do
fId' `shouldBe` fId
B.readFile path `shouldReturn` file
pure filePath
-- delete file
xftpDeleteRcvFile rcp 1 fId
testSend :: AgentClient -> FilePath -> ExceptT AgentErrorType IO (SndFileId, ValidFileDescription 'FSender, ValidFileDescription 'FRecipient, ValidFileDescription 'FRecipient)
testSend sndr filePath = do
xftpStartWorkers sndr (Just senderFiles)
sfId <- xftpSendFile sndr 1 filePath 2
sfProgress sndr $ mb 18
("", sfId', SFDONE sndDescr [rfd1, rfd2]) <- sfGet sndr
liftIO $ sfId' `shouldBe` sfId
pure (sfId, sndDescr, rfd1, rfd2)
testReceive :: AgentClient -> ValidFileDescription 'FRecipient -> FilePath -> ExceptT AgentErrorType IO RcvFileId
testReceive rcp rfd originalFilePath = do
xftpStartWorkers rcp (Just recipientFiles)
rfId <- xftpReceiveFile rcp 1 rfd
rfProgress rcp $ mb 18
("", rfId', RFDONE path) <- rfGet rcp
liftIO $ do
rfId' `shouldBe` rfId
file <- B.readFile originalFilePath
B.readFile path `shouldReturn` file
pure rfId
getFileDescription :: FilePath -> ExceptT AgentErrorType IO (ValidFileDescription 'FRecipient)
getFileDescription path =
@@ -110,30 +134,22 @@ logCfgNoLogs = LogConfig {lc_file = Nothing, lc_stderr = False}
testXFTPAgentReceiveRestore :: IO ()
testXFTPAgentReceiveRestore = withGlobalLogging logCfgNoLogs $ do
let filePath = senderFiles </> "testfile"
fdRcv = filePath <> ".xftp" </> "rcv1.xftp"
fdSnd = filePath <> ".xftp" </> "snd.xftp.private"
filePath <- createRandomFile
withXFTPServerStoreLogOn $ \_ -> do
-- send file using CLI
xftpCLI ["rand", filePath, "17mb"] `shouldReturn` ["File created: " <> filePath]
getFileSize filePath `shouldReturn` mb 17
progress : sendResult <- xftpCLI ["send", filePath, senderFiles, "-s", testXFTPServerStr, "--tmp=tests/tmp"]
progress `shouldSatisfy` uploadProgress
sendResult
`shouldBe` [ "Sender file description: " <> fdSnd,
"Pass file descriptions to the recipient(s):",
fdRcv
]
rfd <- withXFTPServerStoreLogOn $ \_ -> do
-- send file
sndr <- getSMPAgentClient' agentCfg initAgentServers testDB
runRight $ do
(_, _, rfd, _) <- testSend sndr filePath
pure rfd
-- receive file using agent - should not succeed due to server being down
-- receive file - should not succeed with server down
rcp <- getSMPAgentClient' agentCfg initAgentServers testDB
fId <- runRight $ do
rfId <- runRight $ do
xftpStartWorkers rcp (Just recipientFiles)
fd :: ValidFileDescription 'FRecipient <- getFileDescription fdRcv
fId <- xftpReceiveFile rcp 1 fd
rfId <- xftpReceiveFile rcp 1 rfd
liftIO $ timeout 300000 (get rcp) `shouldReturn` Nothing -- wait for worker attempt
pure fId
pure rfId
disconnectAgentClient rcp
[prefixDir] <- listDirectory recipientFiles
@@ -141,13 +157,23 @@ testXFTPAgentReceiveRestore = withGlobalLogging logCfgNoLogs $ do
doesDirectoryExist tmpPath `shouldReturn` True
withXFTPServerStoreLogOn $ \_ -> do
-- receive file using agent - should succeed with server up
-- receive file - should start downloading with server up
rcp' <- getSMPAgentClient' agentCfg initAgentServers testDB
runRight_ $ xftpStartWorkers rcp' (Just recipientFiles)
("", rfId', RFPROG _ _) <- rfGet rcp'
liftIO $ rfId' `shouldBe` rfId
disconnectAgentClient rcp'
threadDelay 100000
withXFTPServerStoreLogOn $ \_ -> do
-- receive file - should continue downloading with server up
rcp' <- getSMPAgentClient' agentCfg initAgentServers testDB
runRight_ $ xftpStartWorkers rcp' (Just recipientFiles)
rfProgress rcp' $ mb 18
("", fId', RFDONE path) <- rfGet rcp'
("", rfId', RFDONE path) <- rfGet rcp'
liftIO $ do
fId' `shouldBe` fId
rfId' `shouldBe` rfId
file <- B.readFile filePath
B.readFile path `shouldReturn` file
@@ -156,30 +182,22 @@ testXFTPAgentReceiveRestore = withGlobalLogging logCfgNoLogs $ do
testXFTPAgentReceiveCleanup :: IO ()
testXFTPAgentReceiveCleanup = withGlobalLogging logCfgNoLogs $ do
let filePath = senderFiles </> "testfile"
fdRcv = filePath <> ".xftp" </> "rcv1.xftp"
fdSnd = filePath <> ".xftp" </> "snd.xftp.private"
filePath <- createRandomFile
withXFTPServerStoreLogOn $ \_ -> do
-- send file using CLI
xftpCLI ["rand", filePath, "17mb"] `shouldReturn` ["File created: " <> filePath]
getFileSize filePath `shouldReturn` mb 17
progress : sendResult <- xftpCLI ["send", filePath, senderFiles, "-s", testXFTPServerStr, "--tmp=tests/tmp"]
progress `shouldSatisfy` uploadProgress
sendResult
`shouldBe` [ "Sender file description: " <> fdSnd,
"Pass file descriptions to the recipient(s):",
fdRcv
]
rfd <- withXFTPServerStoreLogOn $ \_ -> do
-- send file
sndr <- getSMPAgentClient' agentCfg initAgentServers testDB
runRight $ do
(_, _, rfd, _) <- testSend sndr filePath
pure rfd
-- receive file using agent - should not succeed due to server being down
-- receive file - should not succeed with server down
rcp <- getSMPAgentClient' agentCfg initAgentServers testDB
fId <- runRight $ do
rfId <- runRight $ do
xftpStartWorkers rcp (Just recipientFiles)
fd :: ValidFileDescription 'FRecipient <- getFileDescription fdRcv
fId <- xftpReceiveFile rcp 1 fd
rfId <- xftpReceiveFile rcp 1 rfd
liftIO $ timeout 300000 (get rcp) `shouldReturn` Nothing -- wait for worker attempt
pure fId
pure rfId
disconnectAgentClient rcp
[prefixDir] <- listDirectory recipientFiles
@@ -187,43 +205,159 @@ testXFTPAgentReceiveCleanup = withGlobalLogging logCfgNoLogs $ do
doesDirectoryExist tmpPath `shouldReturn` True
withXFTPServerThreadOn $ \_ -> do
-- receive file using agent - should fail with AUTH error
-- receive file - should fail with AUTH error
rcp' <- getSMPAgentClient' agentCfg initAgentServers testDB
runRight_ $ xftpStartWorkers rcp' (Just recipientFiles)
("", fId', RFERR (INTERNAL "XFTP {xftpErr = AUTH}")) <- rfGet rcp'
fId' `shouldBe` fId
("", rfId', RFERR (INTERNAL "XFTP {xftpErr = AUTH}")) <- rfGet rcp'
rfId' `shouldBe` rfId
-- tmp path should be removed after permanent error
doesDirectoryExist tmpPath `shouldReturn` False
testXFTPAgentSendExperimental :: IO ()
testXFTPAgentSendExperimental = withXFTPServer $ do
-- create random file using cli
let filePath = senderFiles </> "testfile"
xftpCLI ["rand", filePath, "17mb"] `shouldReturn` ["File created: " <> filePath]
file <- B.readFile filePath
getFileSize filePath `shouldReturn` mb 17
testXFTPAgentSendRestore :: IO ()
testXFTPAgentSendRestore = withGlobalLogging logCfgNoLogs $ do
filePath <- createRandomFile
-- send file using experimental agent API
-- send file - should not succeed with server down
sndr <- getSMPAgentClient' agentCfg initAgentServers testDB
rfd <- runRight $ do
sfId <- runRight $ do
xftpStartWorkers sndr (Just senderFiles)
sfId <- xftpSendFile sndr 1 filePath 2
sfProgress sndr $ mb 18
("", sfId', SFDONE _sndDescr [rfd1, _rfd2]) <- sfGet sndr
liftIO $ sfId' `shouldBe` sfId
pure rfd1
liftIO $ timeout 1000000 (get sndr) `shouldReturn` Nothing -- wait for worker to encrypt and attempt to create file
pure sfId
disconnectAgentClient sndr
-- receive file using agent
rcp <- getSMPAgentClient' agentCfg initAgentServers testDB
runRight_ $ do
xftpStartWorkers rcp (Just recipientFiles)
rfId <- xftpReceiveFile rcp 1 rfd
rfProgress rcp $ mb 18
("", rfId', RFDONE path) <- rfGet rcp
dirEntries <- listDirectory senderFiles
let prefixDir = fromJust $ find (isSuffixOf "_snd.xftp") dirEntries
prefixPath = senderFiles </> prefixDir
encPath = prefixPath </> "xftp.encrypted"
doesDirectoryExist prefixPath `shouldReturn` True
doesFileExist encPath `shouldReturn` True
withXFTPServerStoreLogOn $ \_ -> do
-- send file - should start uploading with server up
sndr' <- getSMPAgentClient' agentCfg initAgentServers testDB
runRight_ $ xftpStartWorkers sndr' (Just senderFiles)
("", sfId', SFPROG _ _) <- sfGet sndr'
liftIO $ sfId' `shouldBe` sfId
disconnectAgentClient sndr'
threadDelay 100000
withXFTPServerStoreLogOn $ \_ -> do
-- send file - should continue uploading with server up
sndr' <- getSMPAgentClient' agentCfg initAgentServers testDB
runRight_ $ xftpStartWorkers sndr' (Just senderFiles)
sfProgress sndr' $ mb 18
("", sfId', SFDONE _sndDescr [rfd1, _rfd2]) <- sfGet sndr'
liftIO $ sfId' `shouldBe` sfId
-- prefix path should be removed after sending file
doesDirectoryExist prefixPath `shouldReturn` False
doesFileExist encPath `shouldReturn` False
-- receive file
rcp <- getSMPAgentClient' agentCfg initAgentServers testDB
runRight_ $
void $ testReceive rcp rfd1 filePath
testXFTPAgentSendCleanup :: IO ()
testXFTPAgentSendCleanup = withGlobalLogging logCfgNoLogs $ do
filePath <- createRandomFile
sfId <- withXFTPServerStoreLogOn $ \_ -> do
-- send file
sndr <- getSMPAgentClient' agentCfg initAgentServers testDB
sfId <- runRight $ do
xftpStartWorkers sndr (Just senderFiles)
sfId <- xftpSendFile sndr 1 filePath 2
-- wait for progress events for 5 out of 6 chunks - at this point all chunks should be created on the server
forM_ [1 .. 5 :: Integer] $ \_ -> do
(_, _, SFPROG _ _) <- sfGet sndr
pure ()
pure sfId
disconnectAgentClient sndr
pure sfId
dirEntries <- listDirectory senderFiles
let prefixDir = fromJust $ find (isSuffixOf "_snd.xftp") dirEntries
prefixPath = senderFiles </> prefixDir
encPath = prefixPath </> "xftp.encrypted"
doesDirectoryExist prefixPath `shouldReturn` True
doesFileExist encPath `shouldReturn` True
withXFTPServerThreadOn $ \_ -> do
-- send file - should fail with AUTH error
sndr' <- getSMPAgentClient' agentCfg initAgentServers testDB
runRight_ $ xftpStartWorkers sndr' (Just senderFiles)
("", sfId', SFERR (INTERNAL "XFTP {xftpErr = AUTH}")) <- sfGet sndr'
sfId' `shouldBe` sfId
-- prefix path should be removed after permanent error
doesDirectoryExist prefixPath `shouldReturn` False
doesFileExist encPath `shouldReturn` False
testXFTPAgentDelete :: IO ()
testXFTPAgentDelete = withGlobalLogging logCfgNoLogs $
withXFTPServer $ do
filePath <- createRandomFile
-- send file
sndr <- getSMPAgentClient' agentCfg initAgentServers testDB
(sfId, sndDescr, rfd1, rfd2) <- runRight $ testSend sndr filePath
-- receive file
rcp1 <- getSMPAgentClient' agentCfg initAgentServers testDB
runRight_ $
void $ testReceive rcp1 rfd1 filePath
length <$> listDirectory xftpServerFiles `shouldReturn` 6
-- delete file
runRight $ do
xftpStartWorkers sndr (Just senderFiles)
xftpDeleteSndFileRemote sndr 1 sfId sndDescr
Nothing <- liftIO $ 100000 `timeout` sfGet sndr
pure ()
threadDelay 1000000
length <$> listDirectory xftpServerFiles `shouldReturn` 0
-- receive file - should fail with AUTH error
rcp2 <- getSMPAgentClient' agentCfg initAgentServers testDB
runRight $ do
xftpStartWorkers rcp2 (Just recipientFiles)
rfId <- xftpReceiveFile rcp2 1 rfd2
("", rfId', RFERR (INTERNAL "XFTP {xftpErr = AUTH}")) <- rfGet rcp2
liftIO $ rfId' `shouldBe` rfId
testXFTPAgentRequestAdditionalRecipientIDs :: IO ()
testXFTPAgentRequestAdditionalRecipientIDs = withXFTPServer $ do
filePath <- createRandomFile
-- send file
sndr <- getSMPAgentClient' agentCfg initAgentServers testDB
rfds <- runRight $ do
xftpStartWorkers sndr (Just senderFiles)
sfId <- xftpSendFile sndr 1 filePath 500
sfProgress sndr $ mb 18
("", sfId', SFDONE _sndDescr rfds) <- sfGet sndr
liftIO $ do
rfId' `shouldBe` rfId
B.readFile path `shouldReturn` file
sfId' `shouldBe` sfId
length rfds `shouldBe` 500
pure rfds
-- receive file using different descriptions
-- ! revise number of recipients and indexes if xftpMaxRecipientsPerRequest is changed
testReceive' (head rfds) filePath
testReceive' (rfds !! 99) filePath
testReceive' (rfds !! 299) filePath
testReceive' (rfds !! 499) filePath
where
testReceive' rfd originalFilePath = do
rcp <- getSMPAgentClient' agentCfg initAgentServers testDB
runRight_ $
void $ testReceive rcp rfd originalFilePath
testXFTPServerTest :: Maybe BasicAuth -> XFTPServerWithAuth -> IO (Maybe ProtocolTestFailure)
testXFTPServerTest newFileBasicAuth srv =