xftp: recoverable send - spike (#707)

This commit is contained in:
spaced4ndy
2023-04-07 11:19:50 +04:00
committed by GitHub
parent 9f8db13553
commit d8e60ecfdb
10 changed files with 666 additions and 183 deletions

View File

@@ -19,7 +19,7 @@ module Simplex.FileTransfer.Agent
deleteRcvFile,
-- Sending files
sendFileExperimental,
_sendFile,
sendFile,
)
where
@@ -33,18 +33,21 @@ import Data.Bifunctor (first)
import Data.ByteString (ByteString)
import qualified Data.ByteString.Base64.URL as U
import qualified Data.ByteString.Char8 as B
import qualified Data.ByteString.Lazy.Char8 as LB
import Data.Composition ((.:))
import Data.Int (Int64)
import Data.List (foldl', isSuffixOf, partition)
import Data.List.NonEmpty (nonEmpty)
import qualified Data.List.NonEmpty as L
import qualified Data.Map.Strict as M
import Data.Text (Text)
import Data.Time.Clock (getCurrentTime)
import Data.Time.Format (defaultTimeLocale, formatTime)
import Simplex.FileTransfer.Client.Main (CLIError, SendOptions (..), cliSendFileOpts)
import Simplex.FileTransfer.Client (XFTPChunkSpec (..))
import Simplex.FileTransfer.Client.Main
import Simplex.FileTransfer.Crypto
import Simplex.FileTransfer.Description
import Simplex.FileTransfer.Protocol (FileParty (..), FilePartyI)
import Simplex.FileTransfer.Protocol (FileInfo (..), FileParty (..), FilePartyI, SFileParty (..))
import Simplex.FileTransfer.Transport (XFTPRcvChunkSpec (..))
import Simplex.FileTransfer.Types
import Simplex.FileTransfer.Util (removePath, uniqueCombine)
@@ -53,8 +56,12 @@ import Simplex.Messaging.Agent.Env.SQLite
import Simplex.Messaging.Agent.Protocol
import Simplex.Messaging.Agent.RetryInterval
import Simplex.Messaging.Agent.Store.SQLite
import qualified Simplex.Messaging.Crypto as C
import qualified Simplex.Messaging.Crypto.Lazy as LC
import Simplex.Messaging.Encoding
import Simplex.Messaging.Encoding.String
import Simplex.Messaging.Protocol (EntityId, XFTPServer, XFTPServerWithAuth)
import Simplex.Messaging.TMap (TMap)
import qualified Simplex.Messaging.TMap as TM
import Simplex.Messaging.Util (liftError, liftIOEither, tshow, whenM)
import System.FilePath (takeFileName, (</>))
@@ -72,24 +79,25 @@ startWorkers c workDir = do
startFiles = do
rcvFilesTTL <- asks (rcvFilesTTL . config)
pendingRcvServers <- withStore' c (`getPendingRcvFilesServers` rcvFilesTTL)
forM_ pendingRcvServers $ \s -> addXFTPWorker c (Just s)
forM_ pendingRcvServers $ \s -> addXFTPRcvWorker c (Just s)
-- start local worker for files pending decryption,
-- no need to make an extra query for the check
-- as the worker will check the store anyway
addXFTPWorker c Nothing
addXFTPRcvWorker c Nothing
closeXFTPAgent :: MonadUnliftIO m => XFTPAgent -> m ()
closeXFTPAgent XFTPAgent {xftpWorkers} = do
ws <- atomically $ stateTVar xftpWorkers (,M.empty)
mapM_ (uninterruptibleCancel . snd) ws
closeXFTPAgent XFTPAgent {xftpRcvWorkers, xftpSndWorkers} = do
stopWorkers xftpRcvWorkers
stopWorkers xftpSndWorkers
where
stopWorkers wsSel = do
ws <- atomically $ stateTVar wsSel (,M.empty)
mapM_ (uninterruptibleCancel . snd) ws
receiveFile :: AgentMonad m => AgentClient -> UserId -> ValidFileDescription 'FRecipient -> m RcvFileId
receiveFile c userId (ValidFileDescription fd@FileDescription {chunks}) = do
g <- asks idsDrg
workPath <- getXFTPWorkPath
ts <- liftIO getCurrentTime
let isoTime = formatTime defaultTimeLocale "%Y%m%d_%H%M%S_%6q" ts
prefixPath <- uniqueCombine workPath (isoTime <> "_rcv.xftp")
prefixPath <- getPrefixPath "rcv.xftp"
createDirectory prefixPath
let relPrefixPath = takeFileName prefixPath
relTmpPath = relPrefixPath </> "xftp.encrypted"
@@ -102,9 +110,16 @@ receiveFile c userId (ValidFileDescription fd@FileDescription {chunks}) = do
where
downloadChunk :: AgentMonad m => FileChunk -> m ()
downloadChunk FileChunk {replicas = (FileChunkReplica {server} : _)} = do
addXFTPWorker c (Just server)
addXFTPRcvWorker c (Just server)
downloadChunk _ = throwError $ INTERNAL "no replicas"
getPrefixPath :: AgentMonad m => String -> m FilePath
getPrefixPath suffix = do
workPath <- getXFTPWorkPath
ts <- liftIO getCurrentTime
let isoTime = formatTime defaultTimeLocale "%Y%m%d_%H%M%S_%6q" ts
uniqueCombine workPath (isoTime <> "_" <> suffix)
toFSFilePath :: AgentMonad m => FilePath -> m FilePath
toFSFilePath f = (</> f) <$> getXFTPWorkPath
@@ -113,22 +128,32 @@ createEmptyFile fPath = do
h <- openFile fPath AppendMode
liftIO $ B.hPut h "" >> hFlush h
addXFTPWorker :: AgentMonad m => AgentClient -> Maybe XFTPServer -> m ()
addXFTPWorker c srv_ = do
ws <- asks $ xftpWorkers . xftpAgent
addXFTPRcvWorker :: AgentMonad m => AgentClient -> Maybe XFTPServer -> m ()
addXFTPRcvWorker c = addWorker c xftpRcvWorkers runXFTPRcvWorker runXFTPRcvLocalWorker
addWorker ::
AgentMonad m =>
AgentClient ->
(XFTPAgent -> TMap (Maybe XFTPServer) (TMVar (), Async ())) ->
(AgentClient -> XFTPServer -> TMVar () -> m ()) ->
(AgentClient -> TMVar () -> m ()) ->
Maybe XFTPServer ->
m ()
addWorker c wsSel runWorker runWorkerNoSrv srv_ = do
ws <- asks $ wsSel . xftpAgent
atomically (TM.lookup srv_ ws) >>= \case
Nothing -> do
doWork <- newTMVarIO ()
let runWorker = case srv_ of
Just srv -> runXFTPWorker c srv doWork
Nothing -> runXFTPLocalWorker c doWork
worker <- async $ runWorker `E.finally` atomically (TM.delete srv_ ws)
let runWorker' = case srv_ of
Just srv -> runWorker c srv doWork
Nothing -> runWorkerNoSrv c doWork
worker <- async $ runWorker' `E.finally` atomically (TM.delete srv_ ws)
atomically $ TM.insert srv_ (doWork, worker) ws
Just (doWork, _) ->
void . atomically $ tryPutTMVar doWork ()
runXFTPWorker :: forall m. AgentMonad m => AgentClient -> XFTPServer -> TMVar () -> m ()
runXFTPWorker c srv doWork = do
runXFTPRcvWorker :: forall m. AgentMonad m => AgentClient -> XFTPServer -> TMVar () -> m ()
runXFTPRcvWorker c srv doWork = do
forever $ do
void . atomically $ readTMVar doWork
agentOperationBracket c AORcvNetwork waitUntilActive runXftpOperation
@@ -140,30 +165,20 @@ runXFTPWorker c srv doWork = do
nextChunk <- withStore' c $ \db -> getNextRcvChunkToDownload db srv rcvFilesTTL
case nextChunk of
Nothing -> noWorkToDo
Just RcvFileChunk {rcvFileId, rcvFileEntityId, fileTmpPath, replicas = []} -> workerInternalError c rcvFileId rcvFileEntityId (Just fileTmpPath) "chunk has no replicas"
Just fc@RcvFileChunk {userId, rcvFileId, rcvFileEntityId, fileTmpPath, replicas = replica@RcvFileChunkReplica {rcvChunkReplicaId, delay} : _} -> do
Just RcvFileChunk {rcvFileId, rcvFileEntityId, fileTmpPath, replicas = []} -> rcvWorkerInternalError c rcvFileId rcvFileEntityId (Just fileTmpPath) "chunk has no replicas"
Just fc@RcvFileChunk {userId, rcvFileId, rcvFileEntityId, fileTmpPath, replicas = replica@RcvFileChunkReplica {rcvChunkReplicaId, server, replicaId, delay} : _} -> do
ri <- asks $ reconnectInterval . config
let ri' = maybe ri (\d -> ri {initialInterval = d, increaseAfter = 0}) delay
withRetryInterval ri' $ \delay' loop ->
downloadFileChunk fc replica
`catchError` retryOnError delay' loop (workerInternalError c rcvFileId rcvFileEntityId (Just fileTmpPath) . show)
`catchError` \e -> retryOnError c AORcvNetwork "XFTP rcv worker" loop (retryMaintenance e delay') (retryDone e) e
where
retryOnError :: Int64 -> m () -> (AgentErrorType -> m ()) -> AgentErrorType -> m ()
retryOnError replicaDelay loop done e = do
logError $ "XFTP worker error: " <> tshow e
if temporaryAgentError e
then retryLoop
else done e
where
retryLoop = do
notifyOnRetry <- asks (xftpNotifyErrsOnRetry . config)
when notifyOnRetry $ notifyInternalError c rcvFileEntityId $ show e
closeXFTPServerClient c userId replica
withStore' c $ \db -> updateRcvChunkReplicaDelay db rcvChunkReplicaId replicaDelay
atomically $ endAgentOperation c AORcvNetwork
atomically $ throwWhenInactive c
atomically $ beginAgentOperation c AORcvNetwork
loop
retryMaintenance e replicaDelay = do
notifyOnRetry <- asks (xftpNotifyErrsOnRetry . config)
when notifyOnRetry $ notify c rcvFileEntityId $ RFERR e
closeXFTPServerClient c userId server replicaId
withStore' c $ \db -> updateRcvChunkReplicaDelay db rcvChunkReplicaId replicaDelay
retryDone e = rcvWorkerInternalError c rcvFileId rcvFileEntityId (Just fileTmpPath) (show e)
downloadFileChunk :: RcvFileChunk -> RcvFileChunkReplica -> m ()
downloadFileChunk RcvFileChunk {userId, rcvFileId, rcvFileEntityId, rcvChunkId, chunkNo, chunkSize, digest, fileTmpPath} replica = do
fsFileTmpPath <- toFSFilePath fileTmpPath
@@ -178,8 +193,8 @@ runXFTPWorker c srv doWork = do
complete = all chunkReceived chunks
liftIO . when complete $ updateRcvFileStatus db rcvFileId RFSReceived
pure (complete, RFPROG rcvd total)
liftIO $ notify c rcvFileEntityId progress
when complete $ addXFTPWorker c Nothing
notify c rcvFileEntityId progress
when complete $ addXFTPRcvWorker c Nothing
where
receivedSize :: [RcvFileChunk] -> Int64
receivedSize = foldl' (\sz ch -> sz + receivedChunkSize ch) 0
@@ -188,17 +203,28 @@ runXFTPWorker c srv doWork = do
| otherwise = 0
chunkReceived RcvFileChunk {replicas} = any received replicas
workerInternalError :: AgentMonad m => AgentClient -> DBRcvFileId -> RcvFileId -> Maybe FilePath -> String -> m ()
workerInternalError c rcvFileId rcvFileEntityId tmpPath internalErrStr = do
retryOnError :: AgentMonad m => AgentClient -> AgentOperation -> Text -> m () -> m () -> m () -> AgentErrorType -> m ()
retryOnError c agentOp name loop maintenance done e = do
logError $ name <> " error: " <> tshow e
if temporaryAgentError e
then retryLoop
else done
where
retryLoop = do
maintenance `catchError` \_ -> pure ()
atomically $ endAgentOperation c agentOp
atomically $ throwWhenInactive c
atomically $ beginAgentOperation c agentOp
loop
rcvWorkerInternalError :: AgentMonad m => AgentClient -> DBRcvFileId -> RcvFileId -> Maybe FilePath -> String -> m ()
rcvWorkerInternalError c rcvFileId rcvFileEntityId tmpPath internalErrStr = do
forM_ tmpPath (removePath <=< toFSFilePath)
withStore' c $ \db -> updateRcvFileError db rcvFileId internalErrStr
notifyInternalError c rcvFileEntityId internalErrStr
notify c rcvFileEntityId $ RFERR $ INTERNAL internalErrStr
notifyInternalError :: (MonadUnliftIO m) => AgentClient -> RcvFileId -> String -> m ()
notifyInternalError AgentClient {subQ} rcvFileEntityId internalErrStr = atomically $ writeTBQueue subQ ("", rcvFileEntityId, APC SAERcvFile $ RFERR $ INTERNAL internalErrStr)
runXFTPLocalWorker :: forall m. AgentMonad m => AgentClient -> TMVar () -> m ()
runXFTPLocalWorker c doWork = do
runXFTPRcvLocalWorker :: forall m. AgentMonad m => AgentClient -> TMVar () -> m ()
runXFTPRcvLocalWorker c doWork = do
forever $ do
void . atomically $ readTMVar doWork
-- TODO agentOperationBracket?
@@ -211,7 +237,7 @@ runXFTPLocalWorker c doWork = do
case nextFile of
Nothing -> noWorkToDo
Just f@RcvFile {rcvFileId, rcvFileEntityId, tmpPath} ->
decryptFile f `catchError` (workerInternalError c rcvFileId rcvFileEntityId tmpPath . show)
decryptFile f `catchError` (rcvWorkerInternalError c rcvFileId rcvFileEntityId tmpPath . show)
noWorkToDo = void . atomically $ tryTakeTMVar doWork
decryptFile :: RcvFile -> m ()
decryptFile RcvFile {rcvFileId, rcvFileEntityId, key, nonce, tmpPath, savePath, status, chunks} = do
@@ -222,9 +248,9 @@ runXFTPLocalWorker c doWork = do
chunkPaths <- getChunkPaths chunks
encSize <- liftIO $ foldM (\s path -> (s +) . fromIntegral <$> getFileSize path) 0 chunkPaths
void $ liftError (INTERNAL . show) $ decryptChunks encSize chunkPaths key nonce $ \_ -> pure fsSavePath
notify c rcvFileEntityId $ RFDONE fsSavePath
forM_ tmpPath (removePath <=< toFSFilePath)
withStore' c (`updateRcvFileComplete` rcvFileId)
liftIO $ notify c rcvFileEntityId $ RFDONE fsSavePath
where
getChunkPaths :: [RcvFileChunk] -> m [FilePath]
getChunkPaths [] = pure []
@@ -264,7 +290,7 @@ sendFileExperimental c@AgentClient {xftpServers} userId filePath numRecipients =
createDirectoryIfMissing False tempPath
runSend fileName outputDir tempPath `catchError` \e -> do
cleanup outputDir tempPath
liftIO $ notify c sndFileId $ SFERR e
notify c sndFileId $ SFERR e
where
runSend :: String -> FilePath -> FilePath -> m ()
runSend fileName outputDir tempPath = do
@@ -281,7 +307,7 @@ sendFileExperimental c@AgentClient {xftpServers} userId filePath numRecipients =
liftCLI $ cliSendFileOpts sendOptions False $ notify c sndFileId .: SFPROG
(sndDescr, rcvDescrs) <- readDescrs outputDir fileName
cleanup outputDir tempPath
liftIO $ notify c sndFileId $ SFDONE sndDescr rcvDescrs
notify c sndFileId $ SFDONE sndDescr rcvDescrs
cleanup :: FilePath -> FilePath -> m ()
cleanup outputDir tempPath = do
removePath tempPath
@@ -299,71 +325,173 @@ sendFileExperimental c@AgentClient {xftpServers} userId filePath numRecipients =
readDescr :: FilePartyI p => FilePath -> m (ValidFileDescription p)
readDescr f = liftIOEither $ first INTERNAL . strDecode <$> B.readFile f
notify :: forall e. AEntityI e => AgentClient -> EntityId -> ACommand 'Agent e -> IO ()
notify :: forall m e. (MonadUnliftIO m, AEntityI e) => AgentClient -> EntityId -> ACommand 'Agent e -> m ()
notify c entId cmd = atomically $ writeTBQueue (subQ c) ("", entId, APC (sAEntity @e) cmd)
-- _sendFile :: AgentMonad m => AgentClient -> UserId -> FilePath -> Int -> m SndFileId
_sendFile :: AgentClient -> UserId -> FilePath -> Int -> m SndFileId
_sendFile _c _userId _filePath _numRecipients = do
-- db: create file in status New without chunks
-- add local snd worker for encryption
-- return file id to client
undefined
sendFile :: AgentMonad m => AgentClient -> UserId -> FilePath -> Int -> m SndFileId
sendFile c userId filePath numRecipients = do
g <- asks idsDrg
prefixPath <- getPrefixPath "snd.xftp"
createDirectory prefixPath
let relPrefixPath = takeFileName prefixPath
key <- liftIO C.randomSbKey
nonce <- liftIO C.randomCbNonce
-- saving absolute filePath will not allow to restore file encryption after app update, but it's a short window
fId <- withStore c $ \db -> createSndFile db g userId numRecipients filePath relPrefixPath key nonce
addXFTPSndWorker c Nothing
pure fId
_runXFTPSndLocalWorker :: forall m. AgentMonad m => AgentClient -> TMVar () -> m ()
_runXFTPSndLocalWorker _c doWork = do
addXFTPSndWorker :: AgentMonad m => AgentClient -> Maybe XFTPServer -> m ()
addXFTPSndWorker c = addWorker c xftpSndWorkers runXFTPSndWorker runXFTPSndPrepareWorker
runXFTPSndPrepareWorker :: forall m. AgentMonad m => AgentClient -> TMVar () -> m ()
runXFTPSndPrepareWorker c doWork = do
forever $ do
void . atomically $ readTMVar doWork
-- TODO agentOperationBracket
runXftpOperation
where
runXftpOperation :: m ()
runXftpOperation = do
-- db: get next snd file to encrypt (in status New)
-- ? (or Encrypted to retry create? - see below)
-- with fixed retries (?) encryptFile
undefined
_encryptFile :: SndFile -> m ()
_encryptFile _sndFile = do
-- if enc path exists, remove it
-- if enc path doesn't exist:
-- - choose enc path
-- - touch file, db: update enc path (?)
-- calculate chunk sizes, encrypt file to enc path
-- calculate digest
-- prepare chunk specs
-- db:
-- - update file status to Encrypted
-- - create chunks according to chunk specs
-- ? since which servers are online is unknown,
-- ? we can't blindly assign servers to replicas.
-- ? should we XFTP create chunks on servers here,
-- ? with retrying for different servers,
-- ? keeping a list of servers that were tried?
-- ? then we can add replicas to chunks in db
-- ? and update file status to Uploading,
-- ? probably in same transaction as creating chunks,
-- ? and add XFTP snd workers for uploading chunks.
undefined
nextFile <- withStore' c getNextSndFileToPrepare
case nextFile of
Nothing -> noWorkToDo
Just f@SndFile {sndFileId, sndFileEntityId, prefixPath} ->
prepareFile f `catchError` (sndWorkerInternalError c sndFileId sndFileEntityId prefixPath . show)
noWorkToDo = void . atomically $ tryTakeTMVar doWork
prepareFile :: SndFile -> m ()
prepareFile SndFile {prefixPath = Nothing} =
throwError $ INTERNAL "no prefix path"
prepareFile sndFile@SndFile {sndFileId, prefixPath = Just ppath, status} = do
SndFile {numRecipients, chunks} <-
if status /= SFSEncrypted -- status is SFSNew or SFSEncrypting
then do
let encPath = sndFileEncPath ppath
fsEncPath <- toFSFilePath encPath
when (status == SFSEncrypting) $
whenM (doesFileExist fsEncPath) $ removeFile fsEncPath
withStore' c $ \db -> updateSndFileStatus db sndFileId SFSEncrypting
(digest, chunkSpecs) <- encryptFileForUpload sndFile encPath
withStore c $ \db -> do
updateSndFileEncrypted db sndFileId digest chunkSpecs
getSndFile db sndFileId
else pure sndFile
maxRecipients <- asks (xftpMaxRecipientsPerRequest . config)
let numRecipients' = min numRecipients maxRecipients
-- concurrently?
forM_ chunks $ createChunk numRecipients'
withStore' c $ \db -> updateSndFileStatus db sndFileId SFSUploading
where
encryptFileForUpload :: SndFile -> FilePath -> m (FileDigest, [XFTPChunkSpec])
encryptFileForUpload SndFile {key, nonce, filePath} encPath = do
let fileName = takeFileName filePath
fileSize <- fromInteger <$> getFileSize filePath
when (fileSize > maxFileSize) $ throwError $ INTERNAL "max file size exceeded"
let fileHdr = smpEncode FileHeader {fileName, fileExtra = Nothing}
fileSize' = fromIntegral (B.length fileHdr) + fileSize
chunkSizes = prepareChunkSizes $ fileSize' + fileSizeLen + authTagSize
chunkSizes' = map fromIntegral chunkSizes
encSize = sum chunkSizes'
void $ liftError (INTERNAL . show) $ encryptFile filePath fileHdr key nonce fileSize' encSize encPath
digest <- liftIO $ LC.sha512Hash <$> LB.readFile encPath
let chunkSpecs = prepareChunkSpecs encPath chunkSizes
pure (FileDigest digest, chunkSpecs)
createChunk :: Int -> SndFileChunk -> m ()
createChunk numRecipients' SndFileChunk {sndChunkId, userId, chunkSpec} = do
(sndKey, spKey) <- liftIO $ C.generateSignatureKeyPair C.SEd25519
rKeys <- liftIO $ L.fromList <$> replicateM numRecipients' (C.generateSignatureKeyPair C.SEd25519)
ch@FileInfo {digest} <- liftIO $ getChunkInfo sndKey chunkSpec
-- TODO with retry on temporary errors
srvAuth@(ProtoServerWithAuth srv _) <- getServer
(sndId, rIds) <- agentXFTPCreateChunk c userId srvAuth spKey ch (L.map fst rKeys)
let rcvIdsKeys = L.toList $ L.map ChunkReplicaId rIds `L.zip` L.map snd rKeys
withStore' c $ \db -> createSndFileReplica db sndChunkId (FileDigest digest) srv (ChunkReplicaId sndId) spKey rcvIdsKeys
addXFTPSndWorker c $ Just srv
getServer :: m XFTPServerWithAuth
getServer = do
-- TODO get user servers from config
-- TODO choose next server (per chunk? per file?)
undefined
_runXFTPSndWorker :: forall m. AgentMonad m => AgentClient -> XFTPServer -> TMVar () -> m ()
_runXFTPSndWorker c _srv doWork = do
sndWorkerInternalError :: AgentMonad m => AgentClient -> DBSndFileId -> SndFileId -> Maybe FilePath -> String -> m ()
sndWorkerInternalError c sndFileId sndFileEntityId prefixPath internalErrStr = do
forM_ prefixPath (removePath <=< toFSFilePath)
withStore' c $ \db -> updateSndFileError db sndFileId internalErrStr
notify c sndFileEntityId $ SFERR $ INTERNAL internalErrStr
runXFTPSndWorker :: forall m. AgentMonad m => AgentClient -> XFTPServer -> TMVar () -> m ()
runXFTPSndWorker c srv doWork = do
forever $ do
void . atomically $ readTMVar doWork
agentOperationBracket c AOSndNetwork throwWhenInactive runXftpOperation
where
noWorkToDo = void . atomically $ tryTakeTMVar doWork
runXftpOperation :: m ()
runXftpOperation = do
-- db: get next snd chunk to upload (replica is not uploaded)
-- with retry interval uploadChunk
-- - with fixed retries, repeat N times:
-- check if other files are in upload, delay (see xftpSndFiles in XFTPAgent)
undefined
_uploadFileChunk :: SndFileChunk -> m ()
_uploadFileChunk _sndFileChunk = do
-- add file id to xftpSndFiles
-- XFTP upload chunk
-- db: update replica status to Uploaded, return SndFile
-- if all SndFile's replicas are uploaded:
-- - serialize file descriptions and notify client
-- - remove file id from xftpSndFiles
undefined
nextChunk <- withStore' c $ \db -> getNextSndChunkToUpload db srv
case nextChunk of
Nothing -> noWorkToDo
Just SndFileChunk {sndFileId, sndFileEntityId, filePrefixPath, replicas = []} -> sndWorkerInternalError c sndFileId sndFileEntityId (Just filePrefixPath) "chunk has no replicas"
Just fc@SndFileChunk {userId, sndFileId, sndFileEntityId, filePrefixPath, replicas = replica@SndFileChunkReplica {sndChunkReplicaId, server, replicaId, delay} : _} -> do
ri <- asks $ reconnectInterval . config
let ri' = maybe ri (\d -> ri {initialInterval = d, increaseAfter = 0}) delay
withRetryInterval ri' $ \delay' loop ->
uploadFileChunk fc replica
`catchError` \e -> retryOnError c AOSndNetwork "XFTP snd worker" loop (retryMaintenance e delay') (retryDone e) e
where
retryMaintenance e replicaDelay = do
notifyOnRetry <- asks (xftpNotifyErrsOnRetry . config)
when notifyOnRetry $ notify c sndFileEntityId $ SFERR e
closeXFTPServerClient c userId server replicaId
withStore' c $ \db -> updateRcvChunkReplicaDelay db sndChunkReplicaId replicaDelay
retryDone e = sndWorkerInternalError c sndFileId sndFileEntityId (Just filePrefixPath) (show e)
uploadFileChunk :: SndFileChunk -> SndFileChunkReplica -> m ()
uploadFileChunk sndFileChunk@SndFileChunk {sndFileId, userId, chunkSpec} replica = do
replica'@SndFileChunkReplica {sndChunkReplicaId} <- addRecipients sndFileChunk replica
agentXFTPUploadChunk c userId replica' chunkSpec
sf@SndFile {sndFileEntityId, prefixPath, chunks} <- withStore c $ \db -> do
updateSndChunkReplicaStatus db sndChunkReplicaId SFRSUploaded
getSndFile db sndFileId
let complete = all chunkUploaded chunks
-- TODO calculate progress, notify SFPROG
when complete $ do
sndDescr <- sndFileToSndDescr sf
rcvDescrs <- sndFileToRcvDescrs sf
notify c sndFileEntityId $ SFDONE sndDescr rcvDescrs
forM_ prefixPath (removePath <=< toFSFilePath)
withStore' c $ \db -> updateSndFileStatus db sndFileId SFSComplete
where
addRecipients :: SndFileChunk -> SndFileChunkReplica -> m SndFileChunkReplica
addRecipients ch@SndFileChunk {numRecipients} cr@SndFileChunkReplica {rcvIdsKeys}
| length rcvIdsKeys > numRecipients = throwError $ INTERNAL "too many recipients"
| length rcvIdsKeys == numRecipients = pure cr
| otherwise = do
maxRecipients <- asks (xftpMaxRecipientsPerRequest . config)
let numRecipients' = min (numRecipients - length rcvIdsKeys) maxRecipients
rKeys <- liftIO $ L.fromList <$> replicateM numRecipients' (C.generateSignatureKeyPair C.SEd25519)
rIds <- agentXFTPAddRecipients c userId cr (L.map fst rKeys)
let rcvIdsKeys' = L.toList $ L.map ChunkReplicaId rIds `L.zip` L.map snd rKeys
cr' <- withStore' c $ \db -> addSndChunkReplicaRecipients db cr rcvIdsKeys'
addRecipients ch cr'
sndFileToSndDescr :: SndFile -> m (ValidFileDescription 'FSender)
sndFileToSndDescr SndFile {digest = Nothing} = throwError $ INTERNAL "snd file has no digest"
sndFileToSndDescr SndFile {chunks = []} = throwError $ INTERNAL "snd file has no chunks"
sndFileToSndDescr SndFile {digest = Just digest, key, nonce, chunks = chunks@(fstChunk : _)} = do
let chunkSize = FileSize $ sndChunkSize fstChunk
size = FileSize $ sum $ map (fromIntegral . sndChunkSize) chunks
descrChunks <- mapM toDescrChunk chunks
let fd = FileDescription {party = SFSender, size, digest, key, nonce, chunkSize, chunks = descrChunks}
either (throwError . INTERNAL) pure $ validateFileDescription' fd
where
toDescrChunk :: SndFileChunk -> m FileChunk
toDescrChunk SndFileChunk {digest = Nothing} = throwError $ INTERNAL "snd file chunk has no digest"
toDescrChunk SndFileChunk {replicas = []} = throwError $ INTERNAL "snd file chunk has no replicas"
toDescrChunk ch@SndFileChunk {chunkNo, digest = Just chDigest, replicas = (SndFileChunkReplica {server, replicaId, replicaKey} : _)} = do
let chunkSize = FileSize $ sndChunkSize ch
replicas = [FileChunkReplica {server, replicaId, replicaKey}]
pure FileChunk {chunkNo, digest = chDigest, chunkSize, replicas}
sndFileToRcvDescrs :: SndFile -> m [ValidFileDescription 'FRecipient]
sndFileToRcvDescrs SndFile {} = do
undefined
chunkUploaded SndFileChunk {replicas} =
any (\SndFileChunkReplica {replicaStatus} -> replicaStatus == SFRSUploaded) replicas

View File

@@ -16,6 +16,13 @@ module Simplex.FileTransfer.Client.Main
cliSendFile,
cliSendFileOpts,
prepareChunkSizes,
prepareChunkSpecs,
chunkSize1,
chunkSize2,
chunkSize3,
maxFileSize,
fileSizeLen,
getChunkInfo,
)
where
@@ -332,12 +339,6 @@ cliSendFileOpts SendOptions {filePath, outputDir, numRecipients, xftpServers, re
let recipients = L.toList $ L.map ChunkReplicaId rIds `L.zip` L.map snd rKeys
replicas = [SentFileChunkReplica {server = xftpServer, recipients}]
pure (chunkNo, SentFileChunk {chunkNo, sndId, sndPrivateKey = spKey, chunkSize = FileSize $ fromIntegral chunkSize, digest = FileDigest digest, replicas})
getChunkInfo :: SndPublicVerifyKey -> XFTPChunkSpec -> IO FileInfo
getChunkInfo sndKey XFTPChunkSpec {filePath = chunkPath, chunkOffset, chunkSize} =
withFile chunkPath ReadMode $ \h -> do
hSeek h AbsoluteSeek $ fromIntegral chunkOffset
digest <- LC.sha256Hash <$> LB.hGet h (fromIntegral chunkSize)
pure FileInfo {sndKey, size = fromIntegral chunkSize, digest}
getXFTPServer :: TVar StdGen -> NonEmpty XFTPServerWithAuth -> IO XFTPServerWithAuth
getXFTPServer gen = \case
srv :| [] -> pure srv
@@ -406,6 +407,13 @@ cliSendFileOpts SendOptions {filePath, outputDir, numRecipients, xftpServers, re
B.writeFile fdSndPath $ strEncode fdSnd
pure (fdRcvPaths, fdSndPath)
getChunkInfo :: SndPublicVerifyKey -> XFTPChunkSpec -> IO FileInfo
getChunkInfo sndKey XFTPChunkSpec {filePath = chunkPath, chunkOffset, chunkSize} =
withFile chunkPath ReadMode $ \h -> do
hSeek h AbsoluteSeek $ fromIntegral chunkOffset
digest <- LC.sha256Hash <$> LB.hGet h (fromIntegral chunkSize)
pure FileInfo {sndKey, size = fromIntegral chunkSize, digest}
cliReceiveFile :: ReceiveOptions -> ExceptT CLIError IO ()
cliReceiveFile ReceiveOptions {fileDescription, filePath, retryCount, tempPath, verbose, yes} =
getFileDescription' fileDescription >>= receiveFile

View File

@@ -25,6 +25,7 @@ module Simplex.FileTransfer.Description
YAMLFileDescription (..), -- for tests
YAMLServerReplicas (..), -- for tests
validateFileDescription,
validateFileDescription',
groupReplicasByServer,
replicaServer,
fdSeparator,
@@ -205,6 +206,17 @@ validateFileDescription = \case
chunkNos = map (chunkNo :: FileChunk -> Int) chunks
chunksSize = fromIntegral . foldl' (\s FileChunk {chunkSize} -> s + unFileSize chunkSize) 0
-- TODO refactor
validateFileDescription' :: FileDescription p -> Either String (ValidFileDescription p)
validateFileDescription' = \case
fd@FileDescription {size, chunks}
| chunkNos /= [1 .. length chunks] -> Left "chunk numbers are not sequential"
| chunksSize chunks /= unFileSize size -> Left "chunks total size is different than file size"
| otherwise -> Right $ ValidFD fd
where
chunkNos = map (chunkNo :: FileChunk -> Int) chunks
chunksSize = fromIntegral . foldl' (\s FileChunk {chunkSize} -> s + unFileSize chunkSize) 0
encodeFileDescription :: FileDescription p -> YAMLFileDescription
encodeFileDescription FileDescription {party, size, digest, key, nonce, chunkSize, chunks} =
YAMLFileDescription

View File

@@ -11,12 +11,13 @@ import Database.SQLite.Simple.FromField (FromField (..))
import Database.SQLite.Simple.ToField (ToField (..))
import Simplex.FileTransfer.Client (XFTPChunkSpec (..))
import Simplex.FileTransfer.Description
import Simplex.Messaging.Agent.Protocol (RcvFileId)
import Simplex.Messaging.Agent.Protocol (RcvFileId, SndFileId)
import qualified Simplex.Messaging.Crypto as C
import Simplex.Messaging.Encoding
import Simplex.Messaging.Encoding.String
import Simplex.Messaging.Parsers (fromTextField_)
import Simplex.Messaging.Protocol
import System.FilePath ((</>))
authTagSize :: Int64
authTagSize = fromIntegral C.authTagSize
@@ -111,26 +112,31 @@ data RcvFileChunkReplica = RcvFileChunkReplica
type DBSndFileId = Int64
data SndFile = SndFile
{ userId :: Int64,
sndFileId :: DBSndFileId,
size :: FileSize Int64,
digest :: FileDigest,
{ sndFileId :: DBSndFileId,
sndFileEntityId :: SndFileId,
userId :: Int64,
numRecipients :: Int,
digest :: Maybe FileDigest,
key :: C.SbKey,
nonce :: C.CbNonce,
chunkSize :: FileSize Word32,
chunks :: [RcvFileChunk],
path :: FilePath,
encPath :: Maybe FilePath,
status :: SndFileStatus
chunks :: [SndFileChunk],
filePath :: FilePath,
prefixPath :: Maybe FilePath,
status :: SndFileStatus,
deleted :: Bool
}
deriving (Eq, Show)
sndFileEncPath :: FilePath -> FilePath
sndFileEncPath prefixPath = prefixPath </> "xftp.encrypted"
data SndFileStatus
= SFSNew
| SFSEncrypting
| SFSEncrypted
| SFSUploading
| SFSComplete
= SFSNew -- db record created
| SFSEncrypting -- encryption started
| SFSEncrypted -- encryption complete
| SFSUploading -- all chunk replicas are created on servers
| SFSComplete -- all chunk replicas are uploaded
| SFSError -- permanent error
deriving (Eq, Show)
instance FromField SndFileStatus where fromField = fromTextField_ textDecode
@@ -144,6 +150,7 @@ instance TextEncoding SndFileStatus where
"encrypted" -> Just SFSEncrypted
"uploading" -> Just SFSUploading
"complete" -> Just SFSComplete
"error" -> Just SFSError
_ -> Nothing
textEncode = \case
SFSNew -> "new"
@@ -151,34 +158,51 @@ instance TextEncoding SndFileStatus where
SFSEncrypted -> "encrypted"
SFSUploading -> "uploading"
SFSComplete -> "complete"
SFSError -> "error"
data SndFileChunk = SndFileChunk
{ userId :: Int64,
sndFileId :: DBSndFileId,
{ sndFileId :: DBSndFileId,
sndFileEntityId :: SndFileId,
userId :: Int64,
numRecipients :: Int,
sndChunkId :: Int64,
chunkNo :: Int,
chunkSpec :: XFTPChunkSpec,
digest :: FileDigest,
replicas :: [SndFileChunkReplica],
delay :: Maybe Int
filePrefixPath :: FilePath,
digest :: Maybe FileDigest,
replicas :: [SndFileChunkReplica]
}
deriving (Eq, Show)
sndChunkSize :: SndFileChunk -> Word32
sndChunkSize SndFileChunk {chunkSpec = XFTPChunkSpec {chunkSize}} = chunkSize
data SndFileChunkReplica = SndFileChunkReplica
{ sndChunkReplicaId :: Int64,
server :: XFTPServer,
replicaId :: ChunkReplicaId,
replicaKey :: C.APrivateSignKey,
rcvIdsKeys :: [(ChunkReplicaId, C.APrivateSignKey)],
-- created :: Bool,
uploaded :: Bool,
replicaStatus :: SndFileReplicaStatus,
delay :: Maybe Int64,
retries :: Int
}
deriving (Eq, Show)
-- to be used in reply to client
data SndFileDescription = SndFileDescription
{ description :: String,
sender :: Bool
}
data SndFileReplicaStatus
= SFRSCreated
| SFRSUploaded
deriving (Eq, Show)
instance FromField SndFileReplicaStatus where fromField = fromTextField_ textDecode
instance ToField SndFileReplicaStatus where toField = toField . textEncode
instance TextEncoding SndFileReplicaStatus where
textDecode = \case
"created" -> Just SFRSCreated
"uploaded" -> Just SFRSUploaded
_ -> Nothing
textEncode = \case
SFRSCreated -> "created"
SFRSUploaded -> "uploaded"

View File

@@ -53,6 +53,9 @@ module Simplex.Messaging.Agent.Client
agentNtfCheckSubscription,
agentNtfDeleteSubscription,
agentXFTPDownloadChunk,
agentXFTPCreateChunk,
agentXFTPUploadChunk,
agentXFTPAddRecipients,
agentCbEncrypt,
agentCbDecrypt,
cryptoError,
@@ -119,12 +122,12 @@ import Data.Word (Word16)
import qualified Database.SQLite.Simple as DB
import GHC.Generics (Generic)
import Network.Socket (HostName)
import Simplex.FileTransfer.Client (XFTPClient, XFTPClientConfig (..), XFTPClientError)
import Simplex.FileTransfer.Client (XFTPChunkSpec, XFTPClient, XFTPClientConfig (..), XFTPClientError)
import qualified Simplex.FileTransfer.Client as X
import Simplex.FileTransfer.Description (ChunkReplicaId (..), kb)
import Simplex.FileTransfer.Protocol (FileInfo (..), FileResponse, XFTPErrorType (DIGEST))
import Simplex.FileTransfer.Transport (XFTPRcvChunkSpec (..))
import Simplex.FileTransfer.Types (RcvFileChunkReplica (..))
import Simplex.FileTransfer.Types (RcvFileChunkReplica (..), SndFileChunkReplica (..))
import Simplex.FileTransfer.Util (uniqueCombine)
import Simplex.Messaging.Agent.Env.SQLite
import Simplex.Messaging.Agent.Lock
@@ -163,8 +166,11 @@ import Simplex.Messaging.Protocol
QueueIdsKeys (..),
RcvMessage (..),
RcvNtfPublicDhKey,
RecipientId,
SMPMsgMeta (..),
SenderId,
SndPublicVerifyKey,
XFTPServer,
XFTPServerWithAuth,
)
import qualified Simplex.Messaging.Protocol as SMP
@@ -606,8 +612,8 @@ closeClient_ c cVar = do
Just (Right client) -> closeProtocolServerClient client `catchAll_` pure ()
_ -> pure ()
closeXFTPServerClient :: AgentMonad' m => AgentClient -> UserId -> RcvFileChunkReplica -> m ()
closeXFTPServerClient c userId RcvFileChunkReplica {server, replicaId = ChunkReplicaId fId} =
closeXFTPServerClient :: AgentMonad' m => AgentClient -> UserId -> XFTPServer -> ChunkReplicaId -> m ()
closeXFTPServerClient c userId server (ChunkReplicaId fId) =
mkTransportSession c userId server fId >>= liftIO . closeClient c xftpClients
cancelActions :: (Foldable f, Monoid (f (Async ()))) => TVar (f (Async ())) -> IO ()
@@ -1064,6 +1070,18 @@ agentXFTPDownloadChunk :: AgentMonad m => AgentClient -> UserId -> RcvFileChunkR
agentXFTPDownloadChunk c userId RcvFileChunkReplica {server, replicaId = ChunkReplicaId fId, replicaKey} chunkSpec =
withXFTPClient c (userId, server, fId) "FGET" $ \xftp -> X.downloadXFTPChunk xftp replicaKey fId chunkSpec
agentXFTPCreateChunk :: AgentMonad m => AgentClient -> UserId -> XFTPServerWithAuth -> C.APrivateSignKey -> FileInfo -> NonEmpty C.APublicVerifyKey -> m (SenderId, NonEmpty RecipientId)
agentXFTPCreateChunk c userId srv spKey file rcps =
undefined
agentXFTPUploadChunk :: AgentMonad m => AgentClient -> UserId -> SndFileChunkReplica -> XFTPChunkSpec -> m ()
agentXFTPUploadChunk c usedId SndFileChunkReplica {server, replicaId = ChunkReplicaId fId, replicaKey} chunkSpec =
undefined
agentXFTPAddRecipients :: AgentMonad m => AgentClient -> UserId -> SndFileChunkReplica -> NonEmpty C.APublicVerifyKey -> m (NonEmpty RecipientId)
agentXFTPAddRecipients c usedId SndFileChunkReplica {server, replicaId = ChunkReplicaId fId, replicaKey} rcps =
undefined
agentCbEncrypt :: AgentMonad m => SndQueue -> Maybe C.PublicKeyX25519 -> ByteString -> m ByteString
agentCbEncrypt SndQueue {e2eDhSecret, smpClientVersion} e2ePubKey msg = do
cmNonce <- liftIO C.randomCbNonce

View File

@@ -84,6 +84,7 @@ data AgentConfig = AgentConfig
cleanupInterval :: Int64,
rcvFilesTTL :: NominalDiffTime,
xftpNotifyErrsOnRetry :: Bool,
xftpMaxRecipientsPerRequest :: Int,
deleteErrorCount :: Int,
ntfCron :: Word16,
ntfWorkerDelay :: Int,
@@ -145,6 +146,7 @@ defaultAgentConfig =
cleanupInterval = 30 * 60 * 1000000, -- 30 minutes
rcvFilesTTL = 2 * nominalDay,
xftpNotifyErrsOnRetry = True,
xftpMaxRecipientsPerRequest = 200,
deleteErrorCount = 10,
ntfCron = 20, -- minutes
ntfWorkerDelay = 100000, -- microseconds
@@ -205,17 +207,15 @@ newNtfSubSupervisor qSize = do
data XFTPAgent = XFTPAgent
{ -- if set, XFTP file paths will be considered as relative to this directory
xftpWorkDir :: TVar (Maybe FilePath),
xftpWorkers :: TMap (Maybe XFTPServer) (TMVar (), Async ())
xftpRcvWorkers :: TMap (Maybe XFTPServer) (TMVar (), Async ()),
-- separate send workers for unhindered concurrency between download and upload,
-- clients can also be separate by passing direction to withXFTPClient, and differentiating by it
-- xftpSndWorkers :: TMap (Maybe XFTPServer) (TMVar (), Async ()),
-- files currently in upload - to throttle upload of other files' chunks,
-- this optimization can be dropped for the MVP
-- xftpSndFiles :: TVar (Set DBSndFileId)
xftpSndWorkers :: TMap (Maybe XFTPServer) (TMVar (), Async ())
}
newXFTPAgent :: STM XFTPAgent
newXFTPAgent = do
xftpWorkDir <- newTVar Nothing
xftpWorkers <- TM.empty
pure XFTPAgent {xftpWorkDir, xftpWorkers}
xftpRcvWorkers <- TM.empty
xftpSndWorkers <- TM.empty
pure XFTPAgent {xftpWorkDir, xftpRcvWorkers, xftpSndWorkers}

View File

@@ -129,9 +129,11 @@ module Simplex.Messaging.Agent.Store.SQLite
getActiveNtfToken,
getNtfRcvQueue,
setConnectionNtfs,
-- File transfer
-- * File transfer
-- Rcv files
createRcvFile,
getRcvFile,
getRcvFileByEntityId,
updateRcvChunkReplicaDelay,
updateRcvFileChunkReceived,
@@ -147,6 +149,19 @@ module Simplex.Messaging.Agent.Store.SQLite
getCleanupRcvFilesTmpPaths,
getCleanupRcvFilesDeleted,
getRcvFilesExpired,
-- Snd files
createSndFile,
getSndFile,
getNextSndFileToPrepare,
updateSndFileError,
updateSndFileStatus,
updateSndFileEncrypted,
updateSndFileComplete,
createSndFileReplica,
getNextSndChunkToUpload,
updateSndChunkReplicaDelay,
addSndChunkReplicaRecipients,
updateSndChunkReplicaStatus,
-- * utilities
withConnection,
@@ -191,6 +206,7 @@ import Database.SQLite.Simple.ToField (ToField (..))
import qualified Database.SQLite3 as SQLite3
import GHC.Generics (Generic)
import Network.Socket (ServiceName)
import Simplex.FileTransfer.Client (XFTPChunkSpec (..))
import Simplex.FileTransfer.Description
import Simplex.FileTransfer.Protocol (FileParty (..))
import Simplex.FileTransfer.Types
@@ -1868,9 +1884,9 @@ getRcvFileIdByEntityId_ db userId rcvFileEntityId =
getRcvFile :: DB.Connection -> DBRcvFileId -> IO (Either StoreError RcvFile)
getRcvFile db rcvFileId = runExceptT $ do
fd@RcvFile {rcvFileEntityId, userId, tmpPath} <- ExceptT getFile
f@RcvFile {rcvFileEntityId, userId, tmpPath} <- ExceptT getFile
chunks <- maybe (pure []) (liftIO . getChunks rcvFileEntityId userId) tmpPath
pure (fd {chunks} :: RcvFile)
pure (f {chunks} :: RcvFile)
where
getFile :: IO (Either StoreError RcvFile)
getFile = do
@@ -2075,3 +2091,218 @@ getRcvFilesExpired db ttl = do
WHERE created_at < ?
|]
(Only cutoffTs)
createSndFile :: DB.Connection -> TVar ChaChaDRG -> UserId -> Int -> FilePath -> FilePath -> C.SbKey -> C.CbNonce -> IO (Either StoreError SndFileId)
createSndFile db gVar userId numRecipients path prefixPath key nonce =
createWithRandomId gVar $ \sndFileEntityId ->
DB.execute
db
"INSERT INTO snd_files (snd_file_entity_id, user_id, num_recipients, key, nonce, path, prefix_path, status) VALUES (?,?,?,?,?,?,?,?)"
(sndFileEntityId, userId, numRecipients, key, nonce, path, prefixPath, SFSNew)
getSndFile :: DB.Connection -> DBSndFileId -> IO (Either StoreError SndFile)
getSndFile db sndFileId = runExceptT $ do
f@SndFile {sndFileEntityId, userId, numRecipients, prefixPath} <- ExceptT getFile
chunks <- maybe (pure []) (liftIO . getChunks sndFileEntityId userId numRecipients) prefixPath
pure (f {chunks} :: SndFile)
where
getFile :: IO (Either StoreError SndFile)
getFile = do
firstRow toFile SEFileNotFound $
DB.query
db
[sql|
SELECT snd_file_entity_id, user_id, num_recipients, digest, key, nonce, path, prefix_path, status, deleted
FROM snd_files
WHERE snd_file_id = ?
|]
(Only sndFileId)
where
toFile :: (SndFileId, UserId, Int, Maybe FileDigest, C.SbKey, C.CbNonce, FilePath, Maybe FilePath, SndFileStatus, Bool) -> SndFile
toFile (sndFileEntityId, userId, numRecipients, digest, key, nonce, filePath, prefixPath, status, deleted) =
SndFile {sndFileId, sndFileEntityId, userId, numRecipients, digest, key, nonce, filePath, prefixPath, status, deleted, chunks = []}
getChunks :: SndFileId -> UserId -> Int -> FilePath -> IO [SndFileChunk]
getChunks sndFileEntityId userId numRecipients filePrefixPath = do
chunks <-
map toChunk
<$> DB.query
db
[sql|
SELECT snd_file_chunk_id, chunk_no, chunk_offset, chunk_size, digest
FROM snd_file_chunks
WHERE snd_file_id = ?
|]
(Only sndFileId)
forM chunks $ \chunk@SndFileChunk {sndChunkId} -> do
replicas' <- getChunkReplicas sndChunkId
pure (chunk {replicas = replicas'} :: SndFileChunk)
where
toChunk :: (Int64, Int, Int64, Word32, Maybe FileDigest) -> SndFileChunk
toChunk (sndChunkId, chunkNo, chunkOffset, chunkSize, digest) =
let chunkSpec = XFTPChunkSpec {filePath = sndFileEncPath filePrefixPath, chunkOffset, chunkSize}
in SndFileChunk {sndFileId, sndFileEntityId, userId, numRecipients, sndChunkId, chunkNo, chunkSpec, filePrefixPath, digest, replicas = []}
getChunkReplicas :: Int64 -> IO [SndFileChunkReplica]
getChunkReplicas chunkId = do
replicas <-
map toReplica
<$> DB.query
db
[sql|
SELECT
r.snd_file_chunk_replica_id, r.replica_id, r.replica_key, r.replica_status, r.delay, r.retries,
s.xftp_host, s.xftp_port, s.xftp_key_hash
FROM snd_file_chunk_replicas r
JOIN xftp_servers s ON s.xftp_server_id = r.xftp_server_id
WHERE r.snd_file_chunk_id = ?
|]
(Only chunkId)
forM replicas $ \replica@SndFileChunkReplica {sndChunkReplicaId} -> do
rcvIdsKeys <- getChunkReplicaRecipients_ db sndChunkReplicaId
pure replica {rcvIdsKeys}
where
toReplica :: (Int64, ChunkReplicaId, C.APrivateSignKey, SndFileReplicaStatus, Maybe Int64, Int, NonEmpty TransportHost, ServiceName, C.KeyHash) -> SndFileChunkReplica
toReplica (sndChunkReplicaId, replicaId, replicaKey, replicaStatus, delay, retries, host, port, keyHash) =
let server = XFTPServer host port keyHash
in SndFileChunkReplica {sndChunkReplicaId, server, replicaId, replicaKey, replicaStatus, delay, retries, rcvIdsKeys = []}
getChunkReplicaRecipients_ :: DB.Connection -> Int64 -> IO [(ChunkReplicaId, C.APrivateSignKey)]
getChunkReplicaRecipients_ db replicaId =
DB.query
db
[sql|
SELECT rcv_replica_id, rcv_replica_key
FROM snd_file_chunk_replica_recipients
WHERE snd_file_chunk_replica_id = ?
|]
(Only replicaId)
getNextSndFileToPrepare :: DB.Connection -> IO (Maybe SndFile)
getNextSndFileToPrepare db = do
fileId_ :: Maybe DBSndFileId <-
maybeFirstRow fromOnly $
DB.query
db
[sql|
SELECT snd_file_id
FROM snd_files
WHERE status IN (?,?,?) AND deleted = 0
ORDER BY created_at ASC LIMIT 1
|]
(SFSNew, SFSEncrypting, SFSEncrypted)
case fileId_ of
Nothing -> pure Nothing
Just fileId -> eitherToMaybe <$> getSndFile db fileId
updateSndFileError :: DB.Connection -> DBSndFileId -> String -> IO ()
updateSndFileError db sndFileId errStr = do
updatedAt <- getCurrentTime
DB.execute db "UPDATE snd_files SET prefix_path = NULL, error = ?, status = ?, updated_at = ? WHERE snd_file_id = ?" (errStr, SFSError, updatedAt, sndFileId)
updateSndFileStatus :: DB.Connection -> DBSndFileId -> SndFileStatus -> IO ()
updateSndFileStatus db sndFileId status = do
updatedAt <- getCurrentTime
DB.execute db "UPDATE snd_files SET status = ?, updated_at = ? WHERE snd_file_id = ?" (status, updatedAt, sndFileId)
updateSndFileEncrypted :: DB.Connection -> DBSndFileId -> FileDigest -> [XFTPChunkSpec] -> IO ()
updateSndFileEncrypted db sndFileId digest chunkSpecs = do
updatedAt <- getCurrentTime
DB.execute db "UPDATE snd_files SET status = ?, digest = ?, updated_at = ? WHERE snd_file_id = ?" (SFSEncrypted, digest, updatedAt, sndFileId)
forM_ (zip [1 ..] chunkSpecs) $ \(chunkNo :: Int, XFTPChunkSpec {chunkOffset, chunkSize}) ->
DB.execute db "INSERT INTO snd_file_chunks (snd_file_id, chunk_no, chunk_offset, chunk_size) VALUES (?,?,?,?)" (sndFileId, chunkNo, chunkOffset, chunkSize)
updateSndFileComplete :: DB.Connection -> DBSndFileId -> IO ()
updateSndFileComplete db sndFileId = do
updatedAt <- getCurrentTime
DB.execute db "UPDATE snd_files SET prefix_path = NULL, status = ?, updated_at = ? WHERE snd_file_id = ?" (SFSComplete, updatedAt, sndFileId)
createSndFileReplica :: DB.Connection -> Int64 -> FileDigest -> XFTPServer -> ChunkReplicaId -> C.APrivateSignKey -> [(ChunkReplicaId, C.APrivateSignKey)] -> IO ()
createSndFileReplica db sndChunkId digest xftpServer sndId spKey rcvIdsKeys = do
srvId <- createXFTPServer_ db xftpServer
updatedAt <- getCurrentTime
DB.execute db "UPDATE snd_file_chunks SET digest = ?, updated_at = ? WHERE snd_file_chunk_id = ?" (digest, updatedAt, sndChunkId)
DB.execute
db
[sql|
INSERT INTO snd_file_chunk_replicas
(snd_file_chunk_id, replica_number, xftp_server_id, replica_id, replica_key, replica_status)
VALUES (?,?,?,?,?,?)
|]
(sndChunkId, 1 :: Int, srvId, sndId, spKey, SFRSCreated)
rId <- insertedRowId db
forM_ rcvIdsKeys $ \(rcvId, rcvKey) -> do
DB.execute
db
[sql|
INSERT INTO snd_file_chunk_replica_recipients
(snd_file_chunk_replica_id, rcv_replica_id, rcv_replica_key)
VALUES (?,?,?)
|]
(rId, rcvId, rcvKey)
getNextSndChunkToUpload :: DB.Connection -> XFTPServer -> IO (Maybe SndFileChunk)
getNextSndChunkToUpload db server@ProtocolServer {host, port, keyHash} = do
chunk_ <-
maybeFirstRow toChunk $
DB.query
db
[sql|
SELECT
f.snd_file_id, f.snd_file_entity_id, f.user_id, f.num_recipients, f.prefix_path,
c.snd_file_chunk_id, c.chunk_no, c.chunk_offset, c.chunk_size, c.digest,
r.snd_file_chunk_replica_id, r.replica_id, r.replica_key, r.replica_status, r.delay, r.retries
FROM snd_file_chunk_replicas r
JOIN xftp_servers s ON s.xftp_server_id = r.xftp_server_id
JOIN snd_file_chunks c ON c.snd_file_chunk_id = r.snd_file_chunk_id
JOIN snd_files f ON f.snd_file_id = c.snd_file_id
WHERE s.xftp_host = ? AND s.xftp_port = ? AND s.xftp_key_hash = ?
AND r.status = ? AND r.replica_number = 1
AND (f.status = ? OR f.status = ?) AND f.deleted = 0
ORDER BY r.created_at ASC
LIMIT 1
|]
(host, port, keyHash, SFRSCreated, SFSEncrypted, SFSUploading)
forM chunk_ $ \chunk@SndFileChunk {replicas} -> do
replicas' <- forM replicas $ \replica@SndFileChunkReplica {sndChunkReplicaId} -> do
rcvIdsKeys <- getChunkReplicaRecipients_ db sndChunkReplicaId
pure replica {rcvIdsKeys}
pure (chunk {replicas = replicas'} :: SndFileChunk)
where
toChunk :: ((DBSndFileId, SndFileId, UserId, Int, FilePath) :. (Int64, Int, Int64, Word32, Maybe FileDigest) :. (Int64, ChunkReplicaId, C.APrivateSignKey, SndFileReplicaStatus, Maybe Int64, Int)) -> SndFileChunk
toChunk ((sndFileId, sndFileEntityId, userId, numRecipients, filePrefixPath) :. (sndChunkId, chunkNo, chunkOffset, chunkSize, digest) :. (sndChunkReplicaId, replicaId, replicaKey, replicaStatus, delay, retries)) =
let chunkSpec = XFTPChunkSpec {filePath = sndFileEncPath filePrefixPath, chunkOffset, chunkSize}
in SndFileChunk
{ sndFileId,
sndFileEntityId,
userId,
numRecipients,
sndChunkId,
chunkNo,
chunkSpec,
digest,
filePrefixPath,
replicas = [SndFileChunkReplica {sndChunkReplicaId, server, replicaId, replicaKey, replicaStatus, delay, retries, rcvIdsKeys = []}]
}
updateSndChunkReplicaDelay :: DB.Connection -> Int64 -> Int64 -> IO ()
updateSndChunkReplicaDelay db replicaId delay = do
updatedAt <- getCurrentTime
DB.execute db "UPDATE snd_file_chunk_replicas SET delay = ?, retries = retries + 1, updated_at = ? WHERE snd_file_chunk_replica_id = ?" (delay, updatedAt, replicaId)
addSndChunkReplicaRecipients :: DB.Connection -> SndFileChunkReplica -> [(ChunkReplicaId, C.APrivateSignKey)] -> IO SndFileChunkReplica
addSndChunkReplicaRecipients db r@SndFileChunkReplica {sndChunkReplicaId} rcvIdsKeys = do
forM_ rcvIdsKeys $ \(rcvId, rcvKey) -> do
DB.execute
db
[sql|
INSERT INTO snd_file_chunk_replica_recipients
(snd_file_chunk_replica_id, rcv_replica_id, rcv_replica_key)
VALUES (?,?,?)
|]
(sndChunkReplicaId, rcvId, rcvKey)
rcvIdsKeys' <- getChunkReplicaRecipients_ db sndChunkReplicaId
pure r {rcvIdsKeys = rcvIdsKeys'}
updateSndChunkReplicaStatus :: DB.Connection -> Int64 -> SndFileReplicaStatus -> IO ()
updateSndChunkReplicaStatus db replicaId status = do
updatedAt <- getCurrentTime
DB.execute db "UPDATE snd_file_chunk_replicas SET status = ?, updated_at = ? WHERE snd_file_chunk_replica_id = ?" (status, updatedAt, replicaId)

View File

@@ -57,6 +57,7 @@ import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230120_delete_errors
import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230217_server_key_hash
import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230223_files
import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230320_retry_state
import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230401_snd_files
import Simplex.Messaging.Encoding.String
import Simplex.Messaging.Parsers (dropPrefix, sumTypeJSON)
import Simplex.Messaging.Transport.Client (TransportHost)
@@ -80,7 +81,8 @@ schemaMigrations =
("m20230120_delete_errors", m20230120_delete_errors, Nothing),
("m20230217_server_key_hash", m20230217_server_key_hash, Nothing),
("m20230223_files", m20230223_files, Just down_m20230223_files),
("m20230320_retry_state", m20230320_retry_state, Just down_m20230320_retry_state)
("m20230320_retry_state", m20230320_retry_state, Just down_m20230320_retry_state),
("m20230401_snd_files", m20230401_snd_files, Just down_m20230401_snd_files)
]
-- | The list of migrations in ascending order by date

View File

@@ -5,21 +5,22 @@ module Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230401_snd_files where
import Database.SQLite.Simple (Query)
import Database.SQLite.Simple.QQ (sql)
-- this migration is a draft - it is not included in the list of migrations
m20230401_snd_files :: Query
m20230401_snd_files =
[sql|
CREATE TABLE snd_files (
snd_file_id INTEGER PRIMARY KEY AUTOINCREMENT,
snd_file_id INTEGER PRIMARY KEY,
snd_file_entity_id BLOB NOT NULL,
user_id INTEGER NOT NULL REFERENCES users ON DELETE CASCADE,
size INTEGER NOT NULL,
digest BLOB NOT NULL,
key BLOB NOT NULL,
nonce BLOB NOT NULL,
chunk_size INTEGER NOT NULL,
num_recipients INTEGER NOT NULL,
digest BLOB,
key BLOB NOT NUll,
nonce BLOB NOT NUll,
path TEXT NOT NULL,
enc_path TEXT,
prefix_path TEXT,
status TEXT NOT NULL,
deleted INTEGER NOT NULL DEFAULT 0,
error TEXT,
created_at TEXT NOT NULL DEFAULT (datetime('now')),
updated_at TEXT NOT NULL DEFAULT (datetime('now'))
);
@@ -32,18 +33,13 @@ CREATE TABLE snd_file_chunks (
chunk_no INTEGER NOT NULL,
chunk_offset INTEGER NOT NULL,
chunk_size INTEGER NOT NULL,
digest BLOB NOT NULL,
delay INTEGER,
digest BLOB,
created_at TEXT NOT NULL DEFAULT (datetime('now')),
updated_at TEXT NOT NULL DEFAULT (datetime('now'))
);
CREATE INDEX idx_snd_file_chunks_snd_file_id ON snd_file_chunks(snd_file_id);
-- ? add fk to snd_file_descriptions?
-- ? probably it's not necessary since these entities are
-- ? required at different stages of sending files -
-- ? replicas on upload, description on notifying client
CREATE TABLE snd_file_chunk_replicas (
snd_file_chunk_replica_id INTEGER PRIMARY KEY,
snd_file_chunk_id INTEGER NOT NULL REFERENCES snd_file_chunks ON DELETE CASCADE,
@@ -51,8 +47,8 @@ CREATE TABLE snd_file_chunk_replicas (
xftp_server_id INTEGER NOT NULL REFERENCES xftp_servers ON DELETE CASCADE,
replica_id BLOB NOT NULL,
replica_key BLOB NOT NULL,
-- created INTEGER NOT NULL DEFAULT 0, -- as in XFTP create - registered on server
uploaded INTEGER NOT NULL DEFAULT 0,
replica_status TEXT NOT NULL,
delay INTEGER,
retries INTEGER NOT NULL DEFAULT 0,
created_at TEXT NOT NULL DEFAULT (datetime('now')),
updated_at TEXT NOT NULL DEFAULT (datetime('now'))
@@ -71,15 +67,21 @@ CREATE TABLE snd_file_chunk_replica_recipients (
);
CREATE INDEX idx_snd_file_chunk_replica_recipients_snd_file_chunk_replica_id ON snd_file_chunk_replica_recipients(snd_file_chunk_replica_id);
CREATE TABLE snd_file_descriptions (
snd_file_description_id INTEGER PRIMARY KEY,
snd_file_id INTEGER NOT NULL REFERENCES snd_files ON DELETE CASCADE,
sender INTEGER NOT NULL, -- 1 for sender file description
descr_text TEXT NOT NULL,
created_at TEXT NOT NULL DEFAULT (datetime('now')),
updated_at TEXT NOT NULL DEFAULT (datetime('now'))
);
CREATE INDEX idx_snd_file_descriptions_snd_file_id ON snd_file_descriptions(snd_file_id);
|]
down_m20230401_snd_files :: Query
down_m20230401_snd_files =
[sql|
DROP INDEX idx_snd_file_chunk_replica_recipients_snd_file_chunk_replica_id;
DROP TABLE snd_file_chunk_replica_recipients;
DROP INDEX idx_snd_file_chunk_replicas_snd_file_chunk_id;
DROP INDEX idx_snd_file_chunk_replicas_xftp_server_id;
DROP TABLE snd_file_chunk_replicas;
DROP INDEX idx_snd_file_chunks_snd_file_id;
DROP TABLE snd_file_chunks;
DROP INDEX idx_snd_files_user_id;
DROP TABLE snd_files;
|]

View File

@@ -345,3 +345,61 @@ CREATE INDEX idx_rcv_file_chunk_replicas_rcv_file_chunk_id ON rcv_file_chunk_rep
CREATE INDEX idx_rcv_file_chunk_replicas_xftp_server_id ON rcv_file_chunk_replicas(
xftp_server_id
);
CREATE TABLE snd_files(
snd_file_id INTEGER PRIMARY KEY,
snd_file_entity_id BLOB NOT NULL,
user_id INTEGER NOT NULL REFERENCES users ON DELETE CASCADE,
num_recipients INTEGER NOT NULL,
digest BLOB,
key BLOB NOT NUll,
nonce BLOB NOT NUll,
path TEXT NOT NULL,
prefix_path TEXT,
status TEXT NOT NULL,
deleted INTEGER NOT NULL DEFAULT 0,
error TEXT,
created_at TEXT NOT NULL DEFAULT(datetime('now')),
updated_at TEXT NOT NULL DEFAULT(datetime('now'))
);
CREATE INDEX idx_snd_files_user_id ON snd_files(user_id);
CREATE TABLE snd_file_chunks(
snd_file_chunk_id INTEGER PRIMARY KEY,
snd_file_id INTEGER NOT NULL REFERENCES snd_files ON DELETE CASCADE,
chunk_no INTEGER NOT NULL,
chunk_offset INTEGER NOT NULL,
chunk_size INTEGER NOT NULL,
digest BLOB,
created_at TEXT NOT NULL DEFAULT(datetime('now')),
updated_at TEXT NOT NULL DEFAULT(datetime('now'))
);
CREATE INDEX idx_snd_file_chunks_snd_file_id ON snd_file_chunks(snd_file_id);
CREATE TABLE snd_file_chunk_replicas(
snd_file_chunk_replica_id INTEGER PRIMARY KEY,
snd_file_chunk_id INTEGER NOT NULL REFERENCES snd_file_chunks ON DELETE CASCADE,
replica_number INTEGER NOT NULL,
xftp_server_id INTEGER NOT NULL REFERENCES xftp_servers ON DELETE CASCADE,
replica_id BLOB NOT NULL,
replica_key BLOB NOT NULL,
replica_status TEXT NOT NULL,
delay INTEGER,
retries INTEGER NOT NULL DEFAULT 0,
created_at TEXT NOT NULL DEFAULT(datetime('now')),
updated_at TEXT NOT NULL DEFAULT(datetime('now'))
);
CREATE INDEX idx_snd_file_chunk_replicas_snd_file_chunk_id ON snd_file_chunk_replicas(
snd_file_chunk_id
);
CREATE INDEX idx_snd_file_chunk_replicas_xftp_server_id ON snd_file_chunk_replicas(
xftp_server_id
);
CREATE TABLE snd_file_chunk_replica_recipients(
snd_file_chunk_replica_recipient_id INTEGER PRIMARY KEY,
snd_file_chunk_replica_id INTEGER NOT NULL REFERENCES snd_file_chunk_replicas ON DELETE CASCADE,
rcv_replica_id BLOB NOT NULL,
rcv_replica_key BLOB NOT NULL,
created_at TEXT NOT NULL DEFAULT(datetime('now')),
updated_at TEXT NOT NULL DEFAULT(datetime('now'))
);
CREATE INDEX idx_snd_file_chunk_replica_recipients_snd_file_chunk_replica_id ON snd_file_chunk_replica_recipients(
snd_file_chunk_replica_id
);