From d8e60ecfdbc28e3d9b3da80accfd4a6732f58433 Mon Sep 17 00:00:00 2001 From: spaced4ndy <8711996+spaced4ndy@users.noreply.github.com> Date: Fri, 7 Apr 2023 11:19:50 +0400 Subject: [PATCH] xftp: recoverable send - spike (#707) --- src/Simplex/FileTransfer/Agent.hs | 342 ++++++++++++------ src/Simplex/FileTransfer/Client/Main.hs | 20 +- src/Simplex/FileTransfer/Description.hs | 12 + src/Simplex/FileTransfer/Types.hs | 78 ++-- src/Simplex/Messaging/Agent/Client.hs | 26 +- src/Simplex/Messaging/Agent/Env/SQLite.hs | 14 +- src/Simplex/Messaging/Agent/Store/SQLite.hs | 239 +++++++++++- .../Agent/Store/SQLite/Migrations.hs | 4 +- .../SQLite/Migrations/M20230401_snd_files.hs | 56 +-- .../Store/SQLite/Migrations/agent_schema.sql | 58 +++ 10 files changed, 666 insertions(+), 183 deletions(-) diff --git a/src/Simplex/FileTransfer/Agent.hs b/src/Simplex/FileTransfer/Agent.hs index 4674c4109..547e58326 100644 --- a/src/Simplex/FileTransfer/Agent.hs +++ b/src/Simplex/FileTransfer/Agent.hs @@ -19,7 +19,7 @@ module Simplex.FileTransfer.Agent deleteRcvFile, -- Sending files sendFileExperimental, - _sendFile, + sendFile, ) where @@ -33,18 +33,21 @@ import Data.Bifunctor (first) import Data.ByteString (ByteString) import qualified Data.ByteString.Base64.URL as U import qualified Data.ByteString.Char8 as B +import qualified Data.ByteString.Lazy.Char8 as LB import Data.Composition ((.:)) import Data.Int (Int64) import Data.List (foldl', isSuffixOf, partition) import Data.List.NonEmpty (nonEmpty) import qualified Data.List.NonEmpty as L import qualified Data.Map.Strict as M +import Data.Text (Text) import Data.Time.Clock (getCurrentTime) import Data.Time.Format (defaultTimeLocale, formatTime) -import Simplex.FileTransfer.Client.Main (CLIError, SendOptions (..), cliSendFileOpts) +import Simplex.FileTransfer.Client (XFTPChunkSpec (..)) +import Simplex.FileTransfer.Client.Main import Simplex.FileTransfer.Crypto import Simplex.FileTransfer.Description -import Simplex.FileTransfer.Protocol (FileParty (..), FilePartyI) +import Simplex.FileTransfer.Protocol (FileInfo (..), FileParty (..), FilePartyI, SFileParty (..)) import Simplex.FileTransfer.Transport (XFTPRcvChunkSpec (..)) import Simplex.FileTransfer.Types import Simplex.FileTransfer.Util (removePath, uniqueCombine) @@ -53,8 +56,12 @@ import Simplex.Messaging.Agent.Env.SQLite import Simplex.Messaging.Agent.Protocol import Simplex.Messaging.Agent.RetryInterval import Simplex.Messaging.Agent.Store.SQLite +import qualified Simplex.Messaging.Crypto as C +import qualified Simplex.Messaging.Crypto.Lazy as LC +import Simplex.Messaging.Encoding import Simplex.Messaging.Encoding.String import Simplex.Messaging.Protocol (EntityId, XFTPServer, XFTPServerWithAuth) +import Simplex.Messaging.TMap (TMap) import qualified Simplex.Messaging.TMap as TM import Simplex.Messaging.Util (liftError, liftIOEither, tshow, whenM) import System.FilePath (takeFileName, ()) @@ -72,24 +79,25 @@ startWorkers c workDir = do startFiles = do rcvFilesTTL <- asks (rcvFilesTTL . config) pendingRcvServers <- withStore' c (`getPendingRcvFilesServers` rcvFilesTTL) - forM_ pendingRcvServers $ \s -> addXFTPWorker c (Just s) + forM_ pendingRcvServers $ \s -> addXFTPRcvWorker c (Just s) -- start local worker for files pending decryption, -- no need to make an extra query for the check -- as the worker will check the store anyway - addXFTPWorker c Nothing + addXFTPRcvWorker c Nothing closeXFTPAgent :: MonadUnliftIO m => XFTPAgent -> m () -closeXFTPAgent XFTPAgent {xftpWorkers} = do - ws <- atomically $ stateTVar xftpWorkers (,M.empty) - mapM_ (uninterruptibleCancel . snd) ws +closeXFTPAgent XFTPAgent {xftpRcvWorkers, xftpSndWorkers} = do + stopWorkers xftpRcvWorkers + stopWorkers xftpSndWorkers + where + stopWorkers wsSel = do + ws <- atomically $ stateTVar wsSel (,M.empty) + mapM_ (uninterruptibleCancel . snd) ws receiveFile :: AgentMonad m => AgentClient -> UserId -> ValidFileDescription 'FRecipient -> m RcvFileId receiveFile c userId (ValidFileDescription fd@FileDescription {chunks}) = do g <- asks idsDrg - workPath <- getXFTPWorkPath - ts <- liftIO getCurrentTime - let isoTime = formatTime defaultTimeLocale "%Y%m%d_%H%M%S_%6q" ts - prefixPath <- uniqueCombine workPath (isoTime <> "_rcv.xftp") + prefixPath <- getPrefixPath "rcv.xftp" createDirectory prefixPath let relPrefixPath = takeFileName prefixPath relTmpPath = relPrefixPath "xftp.encrypted" @@ -102,9 +110,16 @@ receiveFile c userId (ValidFileDescription fd@FileDescription {chunks}) = do where downloadChunk :: AgentMonad m => FileChunk -> m () downloadChunk FileChunk {replicas = (FileChunkReplica {server} : _)} = do - addXFTPWorker c (Just server) + addXFTPRcvWorker c (Just server) downloadChunk _ = throwError $ INTERNAL "no replicas" +getPrefixPath :: AgentMonad m => String -> m FilePath +getPrefixPath suffix = do + workPath <- getXFTPWorkPath + ts <- liftIO getCurrentTime + let isoTime = formatTime defaultTimeLocale "%Y%m%d_%H%M%S_%6q" ts + uniqueCombine workPath (isoTime <> "_" <> suffix) + toFSFilePath :: AgentMonad m => FilePath -> m FilePath toFSFilePath f = ( f) <$> getXFTPWorkPath @@ -113,22 +128,32 @@ createEmptyFile fPath = do h <- openFile fPath AppendMode liftIO $ B.hPut h "" >> hFlush h -addXFTPWorker :: AgentMonad m => AgentClient -> Maybe XFTPServer -> m () -addXFTPWorker c srv_ = do - ws <- asks $ xftpWorkers . xftpAgent +addXFTPRcvWorker :: AgentMonad m => AgentClient -> Maybe XFTPServer -> m () +addXFTPRcvWorker c = addWorker c xftpRcvWorkers runXFTPRcvWorker runXFTPRcvLocalWorker + +addWorker :: + AgentMonad m => + AgentClient -> + (XFTPAgent -> TMap (Maybe XFTPServer) (TMVar (), Async ())) -> + (AgentClient -> XFTPServer -> TMVar () -> m ()) -> + (AgentClient -> TMVar () -> m ()) -> + Maybe XFTPServer -> + m () +addWorker c wsSel runWorker runWorkerNoSrv srv_ = do + ws <- asks $ wsSel . xftpAgent atomically (TM.lookup srv_ ws) >>= \case Nothing -> do doWork <- newTMVarIO () - let runWorker = case srv_ of - Just srv -> runXFTPWorker c srv doWork - Nothing -> runXFTPLocalWorker c doWork - worker <- async $ runWorker `E.finally` atomically (TM.delete srv_ ws) + let runWorker' = case srv_ of + Just srv -> runWorker c srv doWork + Nothing -> runWorkerNoSrv c doWork + worker <- async $ runWorker' `E.finally` atomically (TM.delete srv_ ws) atomically $ TM.insert srv_ (doWork, worker) ws Just (doWork, _) -> void . atomically $ tryPutTMVar doWork () -runXFTPWorker :: forall m. AgentMonad m => AgentClient -> XFTPServer -> TMVar () -> m () -runXFTPWorker c srv doWork = do +runXFTPRcvWorker :: forall m. AgentMonad m => AgentClient -> XFTPServer -> TMVar () -> m () +runXFTPRcvWorker c srv doWork = do forever $ do void . atomically $ readTMVar doWork agentOperationBracket c AORcvNetwork waitUntilActive runXftpOperation @@ -140,30 +165,20 @@ runXFTPWorker c srv doWork = do nextChunk <- withStore' c $ \db -> getNextRcvChunkToDownload db srv rcvFilesTTL case nextChunk of Nothing -> noWorkToDo - Just RcvFileChunk {rcvFileId, rcvFileEntityId, fileTmpPath, replicas = []} -> workerInternalError c rcvFileId rcvFileEntityId (Just fileTmpPath) "chunk has no replicas" - Just fc@RcvFileChunk {userId, rcvFileId, rcvFileEntityId, fileTmpPath, replicas = replica@RcvFileChunkReplica {rcvChunkReplicaId, delay} : _} -> do + Just RcvFileChunk {rcvFileId, rcvFileEntityId, fileTmpPath, replicas = []} -> rcvWorkerInternalError c rcvFileId rcvFileEntityId (Just fileTmpPath) "chunk has no replicas" + Just fc@RcvFileChunk {userId, rcvFileId, rcvFileEntityId, fileTmpPath, replicas = replica@RcvFileChunkReplica {rcvChunkReplicaId, server, replicaId, delay} : _} -> do ri <- asks $ reconnectInterval . config let ri' = maybe ri (\d -> ri {initialInterval = d, increaseAfter = 0}) delay withRetryInterval ri' $ \delay' loop -> downloadFileChunk fc replica - `catchError` retryOnError delay' loop (workerInternalError c rcvFileId rcvFileEntityId (Just fileTmpPath) . show) + `catchError` \e -> retryOnError c AORcvNetwork "XFTP rcv worker" loop (retryMaintenance e delay') (retryDone e) e where - retryOnError :: Int64 -> m () -> (AgentErrorType -> m ()) -> AgentErrorType -> m () - retryOnError replicaDelay loop done e = do - logError $ "XFTP worker error: " <> tshow e - if temporaryAgentError e - then retryLoop - else done e - where - retryLoop = do - notifyOnRetry <- asks (xftpNotifyErrsOnRetry . config) - when notifyOnRetry $ notifyInternalError c rcvFileEntityId $ show e - closeXFTPServerClient c userId replica - withStore' c $ \db -> updateRcvChunkReplicaDelay db rcvChunkReplicaId replicaDelay - atomically $ endAgentOperation c AORcvNetwork - atomically $ throwWhenInactive c - atomically $ beginAgentOperation c AORcvNetwork - loop + retryMaintenance e replicaDelay = do + notifyOnRetry <- asks (xftpNotifyErrsOnRetry . config) + when notifyOnRetry $ notify c rcvFileEntityId $ RFERR e + closeXFTPServerClient c userId server replicaId + withStore' c $ \db -> updateRcvChunkReplicaDelay db rcvChunkReplicaId replicaDelay + retryDone e = rcvWorkerInternalError c rcvFileId rcvFileEntityId (Just fileTmpPath) (show e) downloadFileChunk :: RcvFileChunk -> RcvFileChunkReplica -> m () downloadFileChunk RcvFileChunk {userId, rcvFileId, rcvFileEntityId, rcvChunkId, chunkNo, chunkSize, digest, fileTmpPath} replica = do fsFileTmpPath <- toFSFilePath fileTmpPath @@ -178,8 +193,8 @@ runXFTPWorker c srv doWork = do complete = all chunkReceived chunks liftIO . when complete $ updateRcvFileStatus db rcvFileId RFSReceived pure (complete, RFPROG rcvd total) - liftIO $ notify c rcvFileEntityId progress - when complete $ addXFTPWorker c Nothing + notify c rcvFileEntityId progress + when complete $ addXFTPRcvWorker c Nothing where receivedSize :: [RcvFileChunk] -> Int64 receivedSize = foldl' (\sz ch -> sz + receivedChunkSize ch) 0 @@ -188,17 +203,28 @@ runXFTPWorker c srv doWork = do | otherwise = 0 chunkReceived RcvFileChunk {replicas} = any received replicas -workerInternalError :: AgentMonad m => AgentClient -> DBRcvFileId -> RcvFileId -> Maybe FilePath -> String -> m () -workerInternalError c rcvFileId rcvFileEntityId tmpPath internalErrStr = do +retryOnError :: AgentMonad m => AgentClient -> AgentOperation -> Text -> m () -> m () -> m () -> AgentErrorType -> m () +retryOnError c agentOp name loop maintenance done e = do + logError $ name <> " error: " <> tshow e + if temporaryAgentError e + then retryLoop + else done + where + retryLoop = do + maintenance `catchError` \_ -> pure () + atomically $ endAgentOperation c agentOp + atomically $ throwWhenInactive c + atomically $ beginAgentOperation c agentOp + loop + +rcvWorkerInternalError :: AgentMonad m => AgentClient -> DBRcvFileId -> RcvFileId -> Maybe FilePath -> String -> m () +rcvWorkerInternalError c rcvFileId rcvFileEntityId tmpPath internalErrStr = do forM_ tmpPath (removePath <=< toFSFilePath) withStore' c $ \db -> updateRcvFileError db rcvFileId internalErrStr - notifyInternalError c rcvFileEntityId internalErrStr + notify c rcvFileEntityId $ RFERR $ INTERNAL internalErrStr -notifyInternalError :: (MonadUnliftIO m) => AgentClient -> RcvFileId -> String -> m () -notifyInternalError AgentClient {subQ} rcvFileEntityId internalErrStr = atomically $ writeTBQueue subQ ("", rcvFileEntityId, APC SAERcvFile $ RFERR $ INTERNAL internalErrStr) - -runXFTPLocalWorker :: forall m. AgentMonad m => AgentClient -> TMVar () -> m () -runXFTPLocalWorker c doWork = do +runXFTPRcvLocalWorker :: forall m. AgentMonad m => AgentClient -> TMVar () -> m () +runXFTPRcvLocalWorker c doWork = do forever $ do void . atomically $ readTMVar doWork -- TODO agentOperationBracket? @@ -211,7 +237,7 @@ runXFTPLocalWorker c doWork = do case nextFile of Nothing -> noWorkToDo Just f@RcvFile {rcvFileId, rcvFileEntityId, tmpPath} -> - decryptFile f `catchError` (workerInternalError c rcvFileId rcvFileEntityId tmpPath . show) + decryptFile f `catchError` (rcvWorkerInternalError c rcvFileId rcvFileEntityId tmpPath . show) noWorkToDo = void . atomically $ tryTakeTMVar doWork decryptFile :: RcvFile -> m () decryptFile RcvFile {rcvFileId, rcvFileEntityId, key, nonce, tmpPath, savePath, status, chunks} = do @@ -222,9 +248,9 @@ runXFTPLocalWorker c doWork = do chunkPaths <- getChunkPaths chunks encSize <- liftIO $ foldM (\s path -> (s +) . fromIntegral <$> getFileSize path) 0 chunkPaths void $ liftError (INTERNAL . show) $ decryptChunks encSize chunkPaths key nonce $ \_ -> pure fsSavePath + notify c rcvFileEntityId $ RFDONE fsSavePath forM_ tmpPath (removePath <=< toFSFilePath) withStore' c (`updateRcvFileComplete` rcvFileId) - liftIO $ notify c rcvFileEntityId $ RFDONE fsSavePath where getChunkPaths :: [RcvFileChunk] -> m [FilePath] getChunkPaths [] = pure [] @@ -264,7 +290,7 @@ sendFileExperimental c@AgentClient {xftpServers} userId filePath numRecipients = createDirectoryIfMissing False tempPath runSend fileName outputDir tempPath `catchError` \e -> do cleanup outputDir tempPath - liftIO $ notify c sndFileId $ SFERR e + notify c sndFileId $ SFERR e where runSend :: String -> FilePath -> FilePath -> m () runSend fileName outputDir tempPath = do @@ -281,7 +307,7 @@ sendFileExperimental c@AgentClient {xftpServers} userId filePath numRecipients = liftCLI $ cliSendFileOpts sendOptions False $ notify c sndFileId .: SFPROG (sndDescr, rcvDescrs) <- readDescrs outputDir fileName cleanup outputDir tempPath - liftIO $ notify c sndFileId $ SFDONE sndDescr rcvDescrs + notify c sndFileId $ SFDONE sndDescr rcvDescrs cleanup :: FilePath -> FilePath -> m () cleanup outputDir tempPath = do removePath tempPath @@ -299,71 +325,173 @@ sendFileExperimental c@AgentClient {xftpServers} userId filePath numRecipients = readDescr :: FilePartyI p => FilePath -> m (ValidFileDescription p) readDescr f = liftIOEither $ first INTERNAL . strDecode <$> B.readFile f -notify :: forall e. AEntityI e => AgentClient -> EntityId -> ACommand 'Agent e -> IO () +notify :: forall m e. (MonadUnliftIO m, AEntityI e) => AgentClient -> EntityId -> ACommand 'Agent e -> m () notify c entId cmd = atomically $ writeTBQueue (subQ c) ("", entId, APC (sAEntity @e) cmd) --- _sendFile :: AgentMonad m => AgentClient -> UserId -> FilePath -> Int -> m SndFileId -_sendFile :: AgentClient -> UserId -> FilePath -> Int -> m SndFileId -_sendFile _c _userId _filePath _numRecipients = do - -- db: create file in status New without chunks - -- add local snd worker for encryption - -- return file id to client - undefined +sendFile :: AgentMonad m => AgentClient -> UserId -> FilePath -> Int -> m SndFileId +sendFile c userId filePath numRecipients = do + g <- asks idsDrg + prefixPath <- getPrefixPath "snd.xftp" + createDirectory prefixPath + let relPrefixPath = takeFileName prefixPath + key <- liftIO C.randomSbKey + nonce <- liftIO C.randomCbNonce + -- saving absolute filePath will not allow to restore file encryption after app update, but it's a short window + fId <- withStore c $ \db -> createSndFile db g userId numRecipients filePath relPrefixPath key nonce + addXFTPSndWorker c Nothing + pure fId -_runXFTPSndLocalWorker :: forall m. AgentMonad m => AgentClient -> TMVar () -> m () -_runXFTPSndLocalWorker _c doWork = do +addXFTPSndWorker :: AgentMonad m => AgentClient -> Maybe XFTPServer -> m () +addXFTPSndWorker c = addWorker c xftpSndWorkers runXFTPSndWorker runXFTPSndPrepareWorker + +runXFTPSndPrepareWorker :: forall m. AgentMonad m => AgentClient -> TMVar () -> m () +runXFTPSndPrepareWorker c doWork = do forever $ do void . atomically $ readTMVar doWork + -- TODO agentOperationBracket runXftpOperation where runXftpOperation :: m () runXftpOperation = do - -- db: get next snd file to encrypt (in status New) - -- ? (or Encrypted to retry create? - see below) - -- with fixed retries (?) encryptFile - undefined - _encryptFile :: SndFile -> m () - _encryptFile _sndFile = do - -- if enc path exists, remove it - -- if enc path doesn't exist: - -- - choose enc path - -- - touch file, db: update enc path (?) - -- calculate chunk sizes, encrypt file to enc path - -- calculate digest - -- prepare chunk specs - -- db: - -- - update file status to Encrypted - -- - create chunks according to chunk specs - -- ? since which servers are online is unknown, - -- ? we can't blindly assign servers to replicas. - -- ? should we XFTP create chunks on servers here, - -- ? with retrying for different servers, - -- ? keeping a list of servers that were tried? - -- ? then we can add replicas to chunks in db - -- ? and update file status to Uploading, - -- ? probably in same transaction as creating chunks, - -- ? and add XFTP snd workers for uploading chunks. - undefined + nextFile <- withStore' c getNextSndFileToPrepare + case nextFile of + Nothing -> noWorkToDo + Just f@SndFile {sndFileId, sndFileEntityId, prefixPath} -> + prepareFile f `catchError` (sndWorkerInternalError c sndFileId sndFileEntityId prefixPath . show) + noWorkToDo = void . atomically $ tryTakeTMVar doWork + prepareFile :: SndFile -> m () + prepareFile SndFile {prefixPath = Nothing} = + throwError $ INTERNAL "no prefix path" + prepareFile sndFile@SndFile {sndFileId, prefixPath = Just ppath, status} = do + SndFile {numRecipients, chunks} <- + if status /= SFSEncrypted -- status is SFSNew or SFSEncrypting + then do + let encPath = sndFileEncPath ppath + fsEncPath <- toFSFilePath encPath + when (status == SFSEncrypting) $ + whenM (doesFileExist fsEncPath) $ removeFile fsEncPath + withStore' c $ \db -> updateSndFileStatus db sndFileId SFSEncrypting + (digest, chunkSpecs) <- encryptFileForUpload sndFile encPath + withStore c $ \db -> do + updateSndFileEncrypted db sndFileId digest chunkSpecs + getSndFile db sndFileId + else pure sndFile + maxRecipients <- asks (xftpMaxRecipientsPerRequest . config) + let numRecipients' = min numRecipients maxRecipients + -- concurrently? + forM_ chunks $ createChunk numRecipients' + withStore' c $ \db -> updateSndFileStatus db sndFileId SFSUploading + where + encryptFileForUpload :: SndFile -> FilePath -> m (FileDigest, [XFTPChunkSpec]) + encryptFileForUpload SndFile {key, nonce, filePath} encPath = do + let fileName = takeFileName filePath + fileSize <- fromInteger <$> getFileSize filePath + when (fileSize > maxFileSize) $ throwError $ INTERNAL "max file size exceeded" + let fileHdr = smpEncode FileHeader {fileName, fileExtra = Nothing} + fileSize' = fromIntegral (B.length fileHdr) + fileSize + chunkSizes = prepareChunkSizes $ fileSize' + fileSizeLen + authTagSize + chunkSizes' = map fromIntegral chunkSizes + encSize = sum chunkSizes' + void $ liftError (INTERNAL . show) $ encryptFile filePath fileHdr key nonce fileSize' encSize encPath + digest <- liftIO $ LC.sha512Hash <$> LB.readFile encPath + let chunkSpecs = prepareChunkSpecs encPath chunkSizes + pure (FileDigest digest, chunkSpecs) + createChunk :: Int -> SndFileChunk -> m () + createChunk numRecipients' SndFileChunk {sndChunkId, userId, chunkSpec} = do + (sndKey, spKey) <- liftIO $ C.generateSignatureKeyPair C.SEd25519 + rKeys <- liftIO $ L.fromList <$> replicateM numRecipients' (C.generateSignatureKeyPair C.SEd25519) + ch@FileInfo {digest} <- liftIO $ getChunkInfo sndKey chunkSpec + -- TODO with retry on temporary errors + srvAuth@(ProtoServerWithAuth srv _) <- getServer + (sndId, rIds) <- agentXFTPCreateChunk c userId srvAuth spKey ch (L.map fst rKeys) + let rcvIdsKeys = L.toList $ L.map ChunkReplicaId rIds `L.zip` L.map snd rKeys + withStore' c $ \db -> createSndFileReplica db sndChunkId (FileDigest digest) srv (ChunkReplicaId sndId) spKey rcvIdsKeys + addXFTPSndWorker c $ Just srv + getServer :: m XFTPServerWithAuth + getServer = do + -- TODO get user servers from config + -- TODO choose next server (per chunk? per file?) + undefined -_runXFTPSndWorker :: forall m. AgentMonad m => AgentClient -> XFTPServer -> TMVar () -> m () -_runXFTPSndWorker c _srv doWork = do +sndWorkerInternalError :: AgentMonad m => AgentClient -> DBSndFileId -> SndFileId -> Maybe FilePath -> String -> m () +sndWorkerInternalError c sndFileId sndFileEntityId prefixPath internalErrStr = do + forM_ prefixPath (removePath <=< toFSFilePath) + withStore' c $ \db -> updateSndFileError db sndFileId internalErrStr + notify c sndFileEntityId $ SFERR $ INTERNAL internalErrStr + +runXFTPSndWorker :: forall m. AgentMonad m => AgentClient -> XFTPServer -> TMVar () -> m () +runXFTPSndWorker c srv doWork = do forever $ do void . atomically $ readTMVar doWork agentOperationBracket c AOSndNetwork throwWhenInactive runXftpOperation where + noWorkToDo = void . atomically $ tryTakeTMVar doWork runXftpOperation :: m () runXftpOperation = do - -- db: get next snd chunk to upload (replica is not uploaded) - -- with retry interval uploadChunk - -- - with fixed retries, repeat N times: - -- check if other files are in upload, delay (see xftpSndFiles in XFTPAgent) - undefined - _uploadFileChunk :: SndFileChunk -> m () - _uploadFileChunk _sndFileChunk = do - -- add file id to xftpSndFiles - -- XFTP upload chunk - -- db: update replica status to Uploaded, return SndFile - -- if all SndFile's replicas are uploaded: - -- - serialize file descriptions and notify client - -- - remove file id from xftpSndFiles - undefined + nextChunk <- withStore' c $ \db -> getNextSndChunkToUpload db srv + case nextChunk of + Nothing -> noWorkToDo + Just SndFileChunk {sndFileId, sndFileEntityId, filePrefixPath, replicas = []} -> sndWorkerInternalError c sndFileId sndFileEntityId (Just filePrefixPath) "chunk has no replicas" + Just fc@SndFileChunk {userId, sndFileId, sndFileEntityId, filePrefixPath, replicas = replica@SndFileChunkReplica {sndChunkReplicaId, server, replicaId, delay} : _} -> do + ri <- asks $ reconnectInterval . config + let ri' = maybe ri (\d -> ri {initialInterval = d, increaseAfter = 0}) delay + withRetryInterval ri' $ \delay' loop -> + uploadFileChunk fc replica + `catchError` \e -> retryOnError c AOSndNetwork "XFTP snd worker" loop (retryMaintenance e delay') (retryDone e) e + where + retryMaintenance e replicaDelay = do + notifyOnRetry <- asks (xftpNotifyErrsOnRetry . config) + when notifyOnRetry $ notify c sndFileEntityId $ SFERR e + closeXFTPServerClient c userId server replicaId + withStore' c $ \db -> updateRcvChunkReplicaDelay db sndChunkReplicaId replicaDelay + retryDone e = sndWorkerInternalError c sndFileId sndFileEntityId (Just filePrefixPath) (show e) + uploadFileChunk :: SndFileChunk -> SndFileChunkReplica -> m () + uploadFileChunk sndFileChunk@SndFileChunk {sndFileId, userId, chunkSpec} replica = do + replica'@SndFileChunkReplica {sndChunkReplicaId} <- addRecipients sndFileChunk replica + agentXFTPUploadChunk c userId replica' chunkSpec + sf@SndFile {sndFileEntityId, prefixPath, chunks} <- withStore c $ \db -> do + updateSndChunkReplicaStatus db sndChunkReplicaId SFRSUploaded + getSndFile db sndFileId + let complete = all chunkUploaded chunks + -- TODO calculate progress, notify SFPROG + when complete $ do + sndDescr <- sndFileToSndDescr sf + rcvDescrs <- sndFileToRcvDescrs sf + notify c sndFileEntityId $ SFDONE sndDescr rcvDescrs + forM_ prefixPath (removePath <=< toFSFilePath) + withStore' c $ \db -> updateSndFileStatus db sndFileId SFSComplete + where + addRecipients :: SndFileChunk -> SndFileChunkReplica -> m SndFileChunkReplica + addRecipients ch@SndFileChunk {numRecipients} cr@SndFileChunkReplica {rcvIdsKeys} + | length rcvIdsKeys > numRecipients = throwError $ INTERNAL "too many recipients" + | length rcvIdsKeys == numRecipients = pure cr + | otherwise = do + maxRecipients <- asks (xftpMaxRecipientsPerRequest . config) + let numRecipients' = min (numRecipients - length rcvIdsKeys) maxRecipients + rKeys <- liftIO $ L.fromList <$> replicateM numRecipients' (C.generateSignatureKeyPair C.SEd25519) + rIds <- agentXFTPAddRecipients c userId cr (L.map fst rKeys) + let rcvIdsKeys' = L.toList $ L.map ChunkReplicaId rIds `L.zip` L.map snd rKeys + cr' <- withStore' c $ \db -> addSndChunkReplicaRecipients db cr rcvIdsKeys' + addRecipients ch cr' + sndFileToSndDescr :: SndFile -> m (ValidFileDescription 'FSender) + sndFileToSndDescr SndFile {digest = Nothing} = throwError $ INTERNAL "snd file has no digest" + sndFileToSndDescr SndFile {chunks = []} = throwError $ INTERNAL "snd file has no chunks" + sndFileToSndDescr SndFile {digest = Just digest, key, nonce, chunks = chunks@(fstChunk : _)} = do + let chunkSize = FileSize $ sndChunkSize fstChunk + size = FileSize $ sum $ map (fromIntegral . sndChunkSize) chunks + descrChunks <- mapM toDescrChunk chunks + let fd = FileDescription {party = SFSender, size, digest, key, nonce, chunkSize, chunks = descrChunks} + either (throwError . INTERNAL) pure $ validateFileDescription' fd + where + toDescrChunk :: SndFileChunk -> m FileChunk + toDescrChunk SndFileChunk {digest = Nothing} = throwError $ INTERNAL "snd file chunk has no digest" + toDescrChunk SndFileChunk {replicas = []} = throwError $ INTERNAL "snd file chunk has no replicas" + toDescrChunk ch@SndFileChunk {chunkNo, digest = Just chDigest, replicas = (SndFileChunkReplica {server, replicaId, replicaKey} : _)} = do + let chunkSize = FileSize $ sndChunkSize ch + replicas = [FileChunkReplica {server, replicaId, replicaKey}] + pure FileChunk {chunkNo, digest = chDigest, chunkSize, replicas} + sndFileToRcvDescrs :: SndFile -> m [ValidFileDescription 'FRecipient] + sndFileToRcvDescrs SndFile {} = do + undefined + chunkUploaded SndFileChunk {replicas} = + any (\SndFileChunkReplica {replicaStatus} -> replicaStatus == SFRSUploaded) replicas diff --git a/src/Simplex/FileTransfer/Client/Main.hs b/src/Simplex/FileTransfer/Client/Main.hs index 7f9186c73..9dcd64c5e 100644 --- a/src/Simplex/FileTransfer/Client/Main.hs +++ b/src/Simplex/FileTransfer/Client/Main.hs @@ -16,6 +16,13 @@ module Simplex.FileTransfer.Client.Main cliSendFile, cliSendFileOpts, prepareChunkSizes, + prepareChunkSpecs, + chunkSize1, + chunkSize2, + chunkSize3, + maxFileSize, + fileSizeLen, + getChunkInfo, ) where @@ -332,12 +339,6 @@ cliSendFileOpts SendOptions {filePath, outputDir, numRecipients, xftpServers, re let recipients = L.toList $ L.map ChunkReplicaId rIds `L.zip` L.map snd rKeys replicas = [SentFileChunkReplica {server = xftpServer, recipients}] pure (chunkNo, SentFileChunk {chunkNo, sndId, sndPrivateKey = spKey, chunkSize = FileSize $ fromIntegral chunkSize, digest = FileDigest digest, replicas}) - getChunkInfo :: SndPublicVerifyKey -> XFTPChunkSpec -> IO FileInfo - getChunkInfo sndKey XFTPChunkSpec {filePath = chunkPath, chunkOffset, chunkSize} = - withFile chunkPath ReadMode $ \h -> do - hSeek h AbsoluteSeek $ fromIntegral chunkOffset - digest <- LC.sha256Hash <$> LB.hGet h (fromIntegral chunkSize) - pure FileInfo {sndKey, size = fromIntegral chunkSize, digest} getXFTPServer :: TVar StdGen -> NonEmpty XFTPServerWithAuth -> IO XFTPServerWithAuth getXFTPServer gen = \case srv :| [] -> pure srv @@ -406,6 +407,13 @@ cliSendFileOpts SendOptions {filePath, outputDir, numRecipients, xftpServers, re B.writeFile fdSndPath $ strEncode fdSnd pure (fdRcvPaths, fdSndPath) +getChunkInfo :: SndPublicVerifyKey -> XFTPChunkSpec -> IO FileInfo +getChunkInfo sndKey XFTPChunkSpec {filePath = chunkPath, chunkOffset, chunkSize} = + withFile chunkPath ReadMode $ \h -> do + hSeek h AbsoluteSeek $ fromIntegral chunkOffset + digest <- LC.sha256Hash <$> LB.hGet h (fromIntegral chunkSize) + pure FileInfo {sndKey, size = fromIntegral chunkSize, digest} + cliReceiveFile :: ReceiveOptions -> ExceptT CLIError IO () cliReceiveFile ReceiveOptions {fileDescription, filePath, retryCount, tempPath, verbose, yes} = getFileDescription' fileDescription >>= receiveFile diff --git a/src/Simplex/FileTransfer/Description.hs b/src/Simplex/FileTransfer/Description.hs index c995f0dcd..506ded18b 100644 --- a/src/Simplex/FileTransfer/Description.hs +++ b/src/Simplex/FileTransfer/Description.hs @@ -25,6 +25,7 @@ module Simplex.FileTransfer.Description YAMLFileDescription (..), -- for tests YAMLServerReplicas (..), -- for tests validateFileDescription, + validateFileDescription', groupReplicasByServer, replicaServer, fdSeparator, @@ -205,6 +206,17 @@ validateFileDescription = \case chunkNos = map (chunkNo :: FileChunk -> Int) chunks chunksSize = fromIntegral . foldl' (\s FileChunk {chunkSize} -> s + unFileSize chunkSize) 0 +-- TODO refactor +validateFileDescription' :: FileDescription p -> Either String (ValidFileDescription p) +validateFileDescription' = \case + fd@FileDescription {size, chunks} + | chunkNos /= [1 .. length chunks] -> Left "chunk numbers are not sequential" + | chunksSize chunks /= unFileSize size -> Left "chunks total size is different than file size" + | otherwise -> Right $ ValidFD fd + where + chunkNos = map (chunkNo :: FileChunk -> Int) chunks + chunksSize = fromIntegral . foldl' (\s FileChunk {chunkSize} -> s + unFileSize chunkSize) 0 + encodeFileDescription :: FileDescription p -> YAMLFileDescription encodeFileDescription FileDescription {party, size, digest, key, nonce, chunkSize, chunks} = YAMLFileDescription diff --git a/src/Simplex/FileTransfer/Types.hs b/src/Simplex/FileTransfer/Types.hs index b70d9c1c7..ebdcbf99a 100644 --- a/src/Simplex/FileTransfer/Types.hs +++ b/src/Simplex/FileTransfer/Types.hs @@ -11,12 +11,13 @@ import Database.SQLite.Simple.FromField (FromField (..)) import Database.SQLite.Simple.ToField (ToField (..)) import Simplex.FileTransfer.Client (XFTPChunkSpec (..)) import Simplex.FileTransfer.Description -import Simplex.Messaging.Agent.Protocol (RcvFileId) +import Simplex.Messaging.Agent.Protocol (RcvFileId, SndFileId) import qualified Simplex.Messaging.Crypto as C import Simplex.Messaging.Encoding import Simplex.Messaging.Encoding.String import Simplex.Messaging.Parsers (fromTextField_) import Simplex.Messaging.Protocol +import System.FilePath (()) authTagSize :: Int64 authTagSize = fromIntegral C.authTagSize @@ -111,26 +112,31 @@ data RcvFileChunkReplica = RcvFileChunkReplica type DBSndFileId = Int64 data SndFile = SndFile - { userId :: Int64, - sndFileId :: DBSndFileId, - size :: FileSize Int64, - digest :: FileDigest, + { sndFileId :: DBSndFileId, + sndFileEntityId :: SndFileId, + userId :: Int64, + numRecipients :: Int, + digest :: Maybe FileDigest, key :: C.SbKey, nonce :: C.CbNonce, - chunkSize :: FileSize Word32, - chunks :: [RcvFileChunk], - path :: FilePath, - encPath :: Maybe FilePath, - status :: SndFileStatus + chunks :: [SndFileChunk], + filePath :: FilePath, + prefixPath :: Maybe FilePath, + status :: SndFileStatus, + deleted :: Bool } deriving (Eq, Show) +sndFileEncPath :: FilePath -> FilePath +sndFileEncPath prefixPath = prefixPath "xftp.encrypted" + data SndFileStatus - = SFSNew - | SFSEncrypting - | SFSEncrypted - | SFSUploading - | SFSComplete + = SFSNew -- db record created + | SFSEncrypting -- encryption started + | SFSEncrypted -- encryption complete + | SFSUploading -- all chunk replicas are created on servers + | SFSComplete -- all chunk replicas are uploaded + | SFSError -- permanent error deriving (Eq, Show) instance FromField SndFileStatus where fromField = fromTextField_ textDecode @@ -144,6 +150,7 @@ instance TextEncoding SndFileStatus where "encrypted" -> Just SFSEncrypted "uploading" -> Just SFSUploading "complete" -> Just SFSComplete + "error" -> Just SFSError _ -> Nothing textEncode = \case SFSNew -> "new" @@ -151,34 +158,51 @@ instance TextEncoding SndFileStatus where SFSEncrypted -> "encrypted" SFSUploading -> "uploading" SFSComplete -> "complete" + SFSError -> "error" data SndFileChunk = SndFileChunk - { userId :: Int64, - sndFileId :: DBSndFileId, + { sndFileId :: DBSndFileId, + sndFileEntityId :: SndFileId, + userId :: Int64, + numRecipients :: Int, sndChunkId :: Int64, chunkNo :: Int, chunkSpec :: XFTPChunkSpec, - digest :: FileDigest, - replicas :: [SndFileChunkReplica], - delay :: Maybe Int + filePrefixPath :: FilePath, + digest :: Maybe FileDigest, + replicas :: [SndFileChunkReplica] } deriving (Eq, Show) +sndChunkSize :: SndFileChunk -> Word32 +sndChunkSize SndFileChunk {chunkSpec = XFTPChunkSpec {chunkSize}} = chunkSize + data SndFileChunkReplica = SndFileChunkReplica { sndChunkReplicaId :: Int64, server :: XFTPServer, replicaId :: ChunkReplicaId, replicaKey :: C.APrivateSignKey, rcvIdsKeys :: [(ChunkReplicaId, C.APrivateSignKey)], - -- created :: Bool, - uploaded :: Bool, + replicaStatus :: SndFileReplicaStatus, + delay :: Maybe Int64, retries :: Int } deriving (Eq, Show) --- to be used in reply to client -data SndFileDescription = SndFileDescription - { description :: String, - sender :: Bool - } +data SndFileReplicaStatus + = SFRSCreated + | SFRSUploaded deriving (Eq, Show) + +instance FromField SndFileReplicaStatus where fromField = fromTextField_ textDecode + +instance ToField SndFileReplicaStatus where toField = toField . textEncode + +instance TextEncoding SndFileReplicaStatus where + textDecode = \case + "created" -> Just SFRSCreated + "uploaded" -> Just SFRSUploaded + _ -> Nothing + textEncode = \case + SFRSCreated -> "created" + SFRSUploaded -> "uploaded" diff --git a/src/Simplex/Messaging/Agent/Client.hs b/src/Simplex/Messaging/Agent/Client.hs index ef9ab0710..6f872e9ce 100644 --- a/src/Simplex/Messaging/Agent/Client.hs +++ b/src/Simplex/Messaging/Agent/Client.hs @@ -53,6 +53,9 @@ module Simplex.Messaging.Agent.Client agentNtfCheckSubscription, agentNtfDeleteSubscription, agentXFTPDownloadChunk, + agentXFTPCreateChunk, + agentXFTPUploadChunk, + agentXFTPAddRecipients, agentCbEncrypt, agentCbDecrypt, cryptoError, @@ -119,12 +122,12 @@ import Data.Word (Word16) import qualified Database.SQLite.Simple as DB import GHC.Generics (Generic) import Network.Socket (HostName) -import Simplex.FileTransfer.Client (XFTPClient, XFTPClientConfig (..), XFTPClientError) +import Simplex.FileTransfer.Client (XFTPChunkSpec, XFTPClient, XFTPClientConfig (..), XFTPClientError) import qualified Simplex.FileTransfer.Client as X import Simplex.FileTransfer.Description (ChunkReplicaId (..), kb) import Simplex.FileTransfer.Protocol (FileInfo (..), FileResponse, XFTPErrorType (DIGEST)) import Simplex.FileTransfer.Transport (XFTPRcvChunkSpec (..)) -import Simplex.FileTransfer.Types (RcvFileChunkReplica (..)) +import Simplex.FileTransfer.Types (RcvFileChunkReplica (..), SndFileChunkReplica (..)) import Simplex.FileTransfer.Util (uniqueCombine) import Simplex.Messaging.Agent.Env.SQLite import Simplex.Messaging.Agent.Lock @@ -163,8 +166,11 @@ import Simplex.Messaging.Protocol QueueIdsKeys (..), RcvMessage (..), RcvNtfPublicDhKey, + RecipientId, SMPMsgMeta (..), + SenderId, SndPublicVerifyKey, + XFTPServer, XFTPServerWithAuth, ) import qualified Simplex.Messaging.Protocol as SMP @@ -606,8 +612,8 @@ closeClient_ c cVar = do Just (Right client) -> closeProtocolServerClient client `catchAll_` pure () _ -> pure () -closeXFTPServerClient :: AgentMonad' m => AgentClient -> UserId -> RcvFileChunkReplica -> m () -closeXFTPServerClient c userId RcvFileChunkReplica {server, replicaId = ChunkReplicaId fId} = +closeXFTPServerClient :: AgentMonad' m => AgentClient -> UserId -> XFTPServer -> ChunkReplicaId -> m () +closeXFTPServerClient c userId server (ChunkReplicaId fId) = mkTransportSession c userId server fId >>= liftIO . closeClient c xftpClients cancelActions :: (Foldable f, Monoid (f (Async ()))) => TVar (f (Async ())) -> IO () @@ -1064,6 +1070,18 @@ agentXFTPDownloadChunk :: AgentMonad m => AgentClient -> UserId -> RcvFileChunkR agentXFTPDownloadChunk c userId RcvFileChunkReplica {server, replicaId = ChunkReplicaId fId, replicaKey} chunkSpec = withXFTPClient c (userId, server, fId) "FGET" $ \xftp -> X.downloadXFTPChunk xftp replicaKey fId chunkSpec +agentXFTPCreateChunk :: AgentMonad m => AgentClient -> UserId -> XFTPServerWithAuth -> C.APrivateSignKey -> FileInfo -> NonEmpty C.APublicVerifyKey -> m (SenderId, NonEmpty RecipientId) +agentXFTPCreateChunk c userId srv spKey file rcps = + undefined + +agentXFTPUploadChunk :: AgentMonad m => AgentClient -> UserId -> SndFileChunkReplica -> XFTPChunkSpec -> m () +agentXFTPUploadChunk c usedId SndFileChunkReplica {server, replicaId = ChunkReplicaId fId, replicaKey} chunkSpec = + undefined + +agentXFTPAddRecipients :: AgentMonad m => AgentClient -> UserId -> SndFileChunkReplica -> NonEmpty C.APublicVerifyKey -> m (NonEmpty RecipientId) +agentXFTPAddRecipients c usedId SndFileChunkReplica {server, replicaId = ChunkReplicaId fId, replicaKey} rcps = + undefined + agentCbEncrypt :: AgentMonad m => SndQueue -> Maybe C.PublicKeyX25519 -> ByteString -> m ByteString agentCbEncrypt SndQueue {e2eDhSecret, smpClientVersion} e2ePubKey msg = do cmNonce <- liftIO C.randomCbNonce diff --git a/src/Simplex/Messaging/Agent/Env/SQLite.hs b/src/Simplex/Messaging/Agent/Env/SQLite.hs index 7cb7509f0..c7b361b31 100644 --- a/src/Simplex/Messaging/Agent/Env/SQLite.hs +++ b/src/Simplex/Messaging/Agent/Env/SQLite.hs @@ -84,6 +84,7 @@ data AgentConfig = AgentConfig cleanupInterval :: Int64, rcvFilesTTL :: NominalDiffTime, xftpNotifyErrsOnRetry :: Bool, + xftpMaxRecipientsPerRequest :: Int, deleteErrorCount :: Int, ntfCron :: Word16, ntfWorkerDelay :: Int, @@ -145,6 +146,7 @@ defaultAgentConfig = cleanupInterval = 30 * 60 * 1000000, -- 30 minutes rcvFilesTTL = 2 * nominalDay, xftpNotifyErrsOnRetry = True, + xftpMaxRecipientsPerRequest = 200, deleteErrorCount = 10, ntfCron = 20, -- minutes ntfWorkerDelay = 100000, -- microseconds @@ -205,17 +207,15 @@ newNtfSubSupervisor qSize = do data XFTPAgent = XFTPAgent { -- if set, XFTP file paths will be considered as relative to this directory xftpWorkDir :: TVar (Maybe FilePath), - xftpWorkers :: TMap (Maybe XFTPServer) (TMVar (), Async ()) + xftpRcvWorkers :: TMap (Maybe XFTPServer) (TMVar (), Async ()), -- separate send workers for unhindered concurrency between download and upload, -- clients can also be separate by passing direction to withXFTPClient, and differentiating by it - -- xftpSndWorkers :: TMap (Maybe XFTPServer) (TMVar (), Async ()), - -- files currently in upload - to throttle upload of other files' chunks, - -- this optimization can be dropped for the MVP - -- xftpSndFiles :: TVar (Set DBSndFileId) + xftpSndWorkers :: TMap (Maybe XFTPServer) (TMVar (), Async ()) } newXFTPAgent :: STM XFTPAgent newXFTPAgent = do xftpWorkDir <- newTVar Nothing - xftpWorkers <- TM.empty - pure XFTPAgent {xftpWorkDir, xftpWorkers} + xftpRcvWorkers <- TM.empty + xftpSndWorkers <- TM.empty + pure XFTPAgent {xftpWorkDir, xftpRcvWorkers, xftpSndWorkers} diff --git a/src/Simplex/Messaging/Agent/Store/SQLite.hs b/src/Simplex/Messaging/Agent/Store/SQLite.hs index 99e87fc3f..4e5edf996 100644 --- a/src/Simplex/Messaging/Agent/Store/SQLite.hs +++ b/src/Simplex/Messaging/Agent/Store/SQLite.hs @@ -129,9 +129,11 @@ module Simplex.Messaging.Agent.Store.SQLite getActiveNtfToken, getNtfRcvQueue, setConnectionNtfs, - -- File transfer + + -- * File transfer + + -- Rcv files createRcvFile, - getRcvFile, getRcvFileByEntityId, updateRcvChunkReplicaDelay, updateRcvFileChunkReceived, @@ -147,6 +149,19 @@ module Simplex.Messaging.Agent.Store.SQLite getCleanupRcvFilesTmpPaths, getCleanupRcvFilesDeleted, getRcvFilesExpired, + -- Snd files + createSndFile, + getSndFile, + getNextSndFileToPrepare, + updateSndFileError, + updateSndFileStatus, + updateSndFileEncrypted, + updateSndFileComplete, + createSndFileReplica, + getNextSndChunkToUpload, + updateSndChunkReplicaDelay, + addSndChunkReplicaRecipients, + updateSndChunkReplicaStatus, -- * utilities withConnection, @@ -191,6 +206,7 @@ import Database.SQLite.Simple.ToField (ToField (..)) import qualified Database.SQLite3 as SQLite3 import GHC.Generics (Generic) import Network.Socket (ServiceName) +import Simplex.FileTransfer.Client (XFTPChunkSpec (..)) import Simplex.FileTransfer.Description import Simplex.FileTransfer.Protocol (FileParty (..)) import Simplex.FileTransfer.Types @@ -1868,9 +1884,9 @@ getRcvFileIdByEntityId_ db userId rcvFileEntityId = getRcvFile :: DB.Connection -> DBRcvFileId -> IO (Either StoreError RcvFile) getRcvFile db rcvFileId = runExceptT $ do - fd@RcvFile {rcvFileEntityId, userId, tmpPath} <- ExceptT getFile + f@RcvFile {rcvFileEntityId, userId, tmpPath} <- ExceptT getFile chunks <- maybe (pure []) (liftIO . getChunks rcvFileEntityId userId) tmpPath - pure (fd {chunks} :: RcvFile) + pure (f {chunks} :: RcvFile) where getFile :: IO (Either StoreError RcvFile) getFile = do @@ -2075,3 +2091,218 @@ getRcvFilesExpired db ttl = do WHERE created_at < ? |] (Only cutoffTs) + +createSndFile :: DB.Connection -> TVar ChaChaDRG -> UserId -> Int -> FilePath -> FilePath -> C.SbKey -> C.CbNonce -> IO (Either StoreError SndFileId) +createSndFile db gVar userId numRecipients path prefixPath key nonce = + createWithRandomId gVar $ \sndFileEntityId -> + DB.execute + db + "INSERT INTO snd_files (snd_file_entity_id, user_id, num_recipients, key, nonce, path, prefix_path, status) VALUES (?,?,?,?,?,?,?,?)" + (sndFileEntityId, userId, numRecipients, key, nonce, path, prefixPath, SFSNew) + +getSndFile :: DB.Connection -> DBSndFileId -> IO (Either StoreError SndFile) +getSndFile db sndFileId = runExceptT $ do + f@SndFile {sndFileEntityId, userId, numRecipients, prefixPath} <- ExceptT getFile + chunks <- maybe (pure []) (liftIO . getChunks sndFileEntityId userId numRecipients) prefixPath + pure (f {chunks} :: SndFile) + where + getFile :: IO (Either StoreError SndFile) + getFile = do + firstRow toFile SEFileNotFound $ + DB.query + db + [sql| + SELECT snd_file_entity_id, user_id, num_recipients, digest, key, nonce, path, prefix_path, status, deleted + FROM snd_files + WHERE snd_file_id = ? + |] + (Only sndFileId) + where + toFile :: (SndFileId, UserId, Int, Maybe FileDigest, C.SbKey, C.CbNonce, FilePath, Maybe FilePath, SndFileStatus, Bool) -> SndFile + toFile (sndFileEntityId, userId, numRecipients, digest, key, nonce, filePath, prefixPath, status, deleted) = + SndFile {sndFileId, sndFileEntityId, userId, numRecipients, digest, key, nonce, filePath, prefixPath, status, deleted, chunks = []} + getChunks :: SndFileId -> UserId -> Int -> FilePath -> IO [SndFileChunk] + getChunks sndFileEntityId userId numRecipients filePrefixPath = do + chunks <- + map toChunk + <$> DB.query + db + [sql| + SELECT snd_file_chunk_id, chunk_no, chunk_offset, chunk_size, digest + FROM snd_file_chunks + WHERE snd_file_id = ? + |] + (Only sndFileId) + forM chunks $ \chunk@SndFileChunk {sndChunkId} -> do + replicas' <- getChunkReplicas sndChunkId + pure (chunk {replicas = replicas'} :: SndFileChunk) + where + toChunk :: (Int64, Int, Int64, Word32, Maybe FileDigest) -> SndFileChunk + toChunk (sndChunkId, chunkNo, chunkOffset, chunkSize, digest) = + let chunkSpec = XFTPChunkSpec {filePath = sndFileEncPath filePrefixPath, chunkOffset, chunkSize} + in SndFileChunk {sndFileId, sndFileEntityId, userId, numRecipients, sndChunkId, chunkNo, chunkSpec, filePrefixPath, digest, replicas = []} + getChunkReplicas :: Int64 -> IO [SndFileChunkReplica] + getChunkReplicas chunkId = do + replicas <- + map toReplica + <$> DB.query + db + [sql| + SELECT + r.snd_file_chunk_replica_id, r.replica_id, r.replica_key, r.replica_status, r.delay, r.retries, + s.xftp_host, s.xftp_port, s.xftp_key_hash + FROM snd_file_chunk_replicas r + JOIN xftp_servers s ON s.xftp_server_id = r.xftp_server_id + WHERE r.snd_file_chunk_id = ? + |] + (Only chunkId) + forM replicas $ \replica@SndFileChunkReplica {sndChunkReplicaId} -> do + rcvIdsKeys <- getChunkReplicaRecipients_ db sndChunkReplicaId + pure replica {rcvIdsKeys} + where + toReplica :: (Int64, ChunkReplicaId, C.APrivateSignKey, SndFileReplicaStatus, Maybe Int64, Int, NonEmpty TransportHost, ServiceName, C.KeyHash) -> SndFileChunkReplica + toReplica (sndChunkReplicaId, replicaId, replicaKey, replicaStatus, delay, retries, host, port, keyHash) = + let server = XFTPServer host port keyHash + in SndFileChunkReplica {sndChunkReplicaId, server, replicaId, replicaKey, replicaStatus, delay, retries, rcvIdsKeys = []} + +getChunkReplicaRecipients_ :: DB.Connection -> Int64 -> IO [(ChunkReplicaId, C.APrivateSignKey)] +getChunkReplicaRecipients_ db replicaId = + DB.query + db + [sql| + SELECT rcv_replica_id, rcv_replica_key + FROM snd_file_chunk_replica_recipients + WHERE snd_file_chunk_replica_id = ? + |] + (Only replicaId) + +getNextSndFileToPrepare :: DB.Connection -> IO (Maybe SndFile) +getNextSndFileToPrepare db = do + fileId_ :: Maybe DBSndFileId <- + maybeFirstRow fromOnly $ + DB.query + db + [sql| + SELECT snd_file_id + FROM snd_files + WHERE status IN (?,?,?) AND deleted = 0 + ORDER BY created_at ASC LIMIT 1 + |] + (SFSNew, SFSEncrypting, SFSEncrypted) + case fileId_ of + Nothing -> pure Nothing + Just fileId -> eitherToMaybe <$> getSndFile db fileId + +updateSndFileError :: DB.Connection -> DBSndFileId -> String -> IO () +updateSndFileError db sndFileId errStr = do + updatedAt <- getCurrentTime + DB.execute db "UPDATE snd_files SET prefix_path = NULL, error = ?, status = ?, updated_at = ? WHERE snd_file_id = ?" (errStr, SFSError, updatedAt, sndFileId) + +updateSndFileStatus :: DB.Connection -> DBSndFileId -> SndFileStatus -> IO () +updateSndFileStatus db sndFileId status = do + updatedAt <- getCurrentTime + DB.execute db "UPDATE snd_files SET status = ?, updated_at = ? WHERE snd_file_id = ?" (status, updatedAt, sndFileId) + +updateSndFileEncrypted :: DB.Connection -> DBSndFileId -> FileDigest -> [XFTPChunkSpec] -> IO () +updateSndFileEncrypted db sndFileId digest chunkSpecs = do + updatedAt <- getCurrentTime + DB.execute db "UPDATE snd_files SET status = ?, digest = ?, updated_at = ? WHERE snd_file_id = ?" (SFSEncrypted, digest, updatedAt, sndFileId) + forM_ (zip [1 ..] chunkSpecs) $ \(chunkNo :: Int, XFTPChunkSpec {chunkOffset, chunkSize}) -> + DB.execute db "INSERT INTO snd_file_chunks (snd_file_id, chunk_no, chunk_offset, chunk_size) VALUES (?,?,?,?)" (sndFileId, chunkNo, chunkOffset, chunkSize) + +updateSndFileComplete :: DB.Connection -> DBSndFileId -> IO () +updateSndFileComplete db sndFileId = do + updatedAt <- getCurrentTime + DB.execute db "UPDATE snd_files SET prefix_path = NULL, status = ?, updated_at = ? WHERE snd_file_id = ?" (SFSComplete, updatedAt, sndFileId) + +createSndFileReplica :: DB.Connection -> Int64 -> FileDigest -> XFTPServer -> ChunkReplicaId -> C.APrivateSignKey -> [(ChunkReplicaId, C.APrivateSignKey)] -> IO () +createSndFileReplica db sndChunkId digest xftpServer sndId spKey rcvIdsKeys = do + srvId <- createXFTPServer_ db xftpServer + updatedAt <- getCurrentTime + DB.execute db "UPDATE snd_file_chunks SET digest = ?, updated_at = ? WHERE snd_file_chunk_id = ?" (digest, updatedAt, sndChunkId) + DB.execute + db + [sql| + INSERT INTO snd_file_chunk_replicas + (snd_file_chunk_id, replica_number, xftp_server_id, replica_id, replica_key, replica_status) + VALUES (?,?,?,?,?,?) + |] + (sndChunkId, 1 :: Int, srvId, sndId, spKey, SFRSCreated) + rId <- insertedRowId db + forM_ rcvIdsKeys $ \(rcvId, rcvKey) -> do + DB.execute + db + [sql| + INSERT INTO snd_file_chunk_replica_recipients + (snd_file_chunk_replica_id, rcv_replica_id, rcv_replica_key) + VALUES (?,?,?) + |] + (rId, rcvId, rcvKey) + +getNextSndChunkToUpload :: DB.Connection -> XFTPServer -> IO (Maybe SndFileChunk) +getNextSndChunkToUpload db server@ProtocolServer {host, port, keyHash} = do + chunk_ <- + maybeFirstRow toChunk $ + DB.query + db + [sql| + SELECT + f.snd_file_id, f.snd_file_entity_id, f.user_id, f.num_recipients, f.prefix_path, + c.snd_file_chunk_id, c.chunk_no, c.chunk_offset, c.chunk_size, c.digest, + r.snd_file_chunk_replica_id, r.replica_id, r.replica_key, r.replica_status, r.delay, r.retries + FROM snd_file_chunk_replicas r + JOIN xftp_servers s ON s.xftp_server_id = r.xftp_server_id + JOIN snd_file_chunks c ON c.snd_file_chunk_id = r.snd_file_chunk_id + JOIN snd_files f ON f.snd_file_id = c.snd_file_id + WHERE s.xftp_host = ? AND s.xftp_port = ? AND s.xftp_key_hash = ? + AND r.status = ? AND r.replica_number = 1 + AND (f.status = ? OR f.status = ?) AND f.deleted = 0 + ORDER BY r.created_at ASC + LIMIT 1 + |] + (host, port, keyHash, SFRSCreated, SFSEncrypted, SFSUploading) + forM chunk_ $ \chunk@SndFileChunk {replicas} -> do + replicas' <- forM replicas $ \replica@SndFileChunkReplica {sndChunkReplicaId} -> do + rcvIdsKeys <- getChunkReplicaRecipients_ db sndChunkReplicaId + pure replica {rcvIdsKeys} + pure (chunk {replicas = replicas'} :: SndFileChunk) + where + toChunk :: ((DBSndFileId, SndFileId, UserId, Int, FilePath) :. (Int64, Int, Int64, Word32, Maybe FileDigest) :. (Int64, ChunkReplicaId, C.APrivateSignKey, SndFileReplicaStatus, Maybe Int64, Int)) -> SndFileChunk + toChunk ((sndFileId, sndFileEntityId, userId, numRecipients, filePrefixPath) :. (sndChunkId, chunkNo, chunkOffset, chunkSize, digest) :. (sndChunkReplicaId, replicaId, replicaKey, replicaStatus, delay, retries)) = + let chunkSpec = XFTPChunkSpec {filePath = sndFileEncPath filePrefixPath, chunkOffset, chunkSize} + in SndFileChunk + { sndFileId, + sndFileEntityId, + userId, + numRecipients, + sndChunkId, + chunkNo, + chunkSpec, + digest, + filePrefixPath, + replicas = [SndFileChunkReplica {sndChunkReplicaId, server, replicaId, replicaKey, replicaStatus, delay, retries, rcvIdsKeys = []}] + } + +updateSndChunkReplicaDelay :: DB.Connection -> Int64 -> Int64 -> IO () +updateSndChunkReplicaDelay db replicaId delay = do + updatedAt <- getCurrentTime + DB.execute db "UPDATE snd_file_chunk_replicas SET delay = ?, retries = retries + 1, updated_at = ? WHERE snd_file_chunk_replica_id = ?" (delay, updatedAt, replicaId) + +addSndChunkReplicaRecipients :: DB.Connection -> SndFileChunkReplica -> [(ChunkReplicaId, C.APrivateSignKey)] -> IO SndFileChunkReplica +addSndChunkReplicaRecipients db r@SndFileChunkReplica {sndChunkReplicaId} rcvIdsKeys = do + forM_ rcvIdsKeys $ \(rcvId, rcvKey) -> do + DB.execute + db + [sql| + INSERT INTO snd_file_chunk_replica_recipients + (snd_file_chunk_replica_id, rcv_replica_id, rcv_replica_key) + VALUES (?,?,?) + |] + (sndChunkReplicaId, rcvId, rcvKey) + rcvIdsKeys' <- getChunkReplicaRecipients_ db sndChunkReplicaId + pure r {rcvIdsKeys = rcvIdsKeys'} + +updateSndChunkReplicaStatus :: DB.Connection -> Int64 -> SndFileReplicaStatus -> IO () +updateSndChunkReplicaStatus db replicaId status = do + updatedAt <- getCurrentTime + DB.execute db "UPDATE snd_file_chunk_replicas SET status = ?, updated_at = ? WHERE snd_file_chunk_replica_id = ?" (status, updatedAt, replicaId) diff --git a/src/Simplex/Messaging/Agent/Store/SQLite/Migrations.hs b/src/Simplex/Messaging/Agent/Store/SQLite/Migrations.hs index 44dc85a07..182c5effa 100644 --- a/src/Simplex/Messaging/Agent/Store/SQLite/Migrations.hs +++ b/src/Simplex/Messaging/Agent/Store/SQLite/Migrations.hs @@ -57,6 +57,7 @@ import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230120_delete_errors import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230217_server_key_hash import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230223_files import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230320_retry_state +import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230401_snd_files import Simplex.Messaging.Encoding.String import Simplex.Messaging.Parsers (dropPrefix, sumTypeJSON) import Simplex.Messaging.Transport.Client (TransportHost) @@ -80,7 +81,8 @@ schemaMigrations = ("m20230120_delete_errors", m20230120_delete_errors, Nothing), ("m20230217_server_key_hash", m20230217_server_key_hash, Nothing), ("m20230223_files", m20230223_files, Just down_m20230223_files), - ("m20230320_retry_state", m20230320_retry_state, Just down_m20230320_retry_state) + ("m20230320_retry_state", m20230320_retry_state, Just down_m20230320_retry_state), + ("m20230401_snd_files", m20230401_snd_files, Just down_m20230401_snd_files) ] -- | The list of migrations in ascending order by date diff --git a/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/M20230401_snd_files.hs b/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/M20230401_snd_files.hs index df5754fb2..7db1a9f78 100644 --- a/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/M20230401_snd_files.hs +++ b/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/M20230401_snd_files.hs @@ -5,21 +5,22 @@ module Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230401_snd_files where import Database.SQLite.Simple (Query) import Database.SQLite.Simple.QQ (sql) --- this migration is a draft - it is not included in the list of migrations m20230401_snd_files :: Query m20230401_snd_files = [sql| CREATE TABLE snd_files ( - snd_file_id INTEGER PRIMARY KEY AUTOINCREMENT, + snd_file_id INTEGER PRIMARY KEY, + snd_file_entity_id BLOB NOT NULL, user_id INTEGER NOT NULL REFERENCES users ON DELETE CASCADE, - size INTEGER NOT NULL, - digest BLOB NOT NULL, - key BLOB NOT NULL, - nonce BLOB NOT NULL, - chunk_size INTEGER NOT NULL, + num_recipients INTEGER NOT NULL, + digest BLOB, + key BLOB NOT NUll, + nonce BLOB NOT NUll, path TEXT NOT NULL, - enc_path TEXT, + prefix_path TEXT, status TEXT NOT NULL, + deleted INTEGER NOT NULL DEFAULT 0, + error TEXT, created_at TEXT NOT NULL DEFAULT (datetime('now')), updated_at TEXT NOT NULL DEFAULT (datetime('now')) ); @@ -32,18 +33,13 @@ CREATE TABLE snd_file_chunks ( chunk_no INTEGER NOT NULL, chunk_offset INTEGER NOT NULL, chunk_size INTEGER NOT NULL, - digest BLOB NOT NULL, - delay INTEGER, + digest BLOB, created_at TEXT NOT NULL DEFAULT (datetime('now')), updated_at TEXT NOT NULL DEFAULT (datetime('now')) ); CREATE INDEX idx_snd_file_chunks_snd_file_id ON snd_file_chunks(snd_file_id); --- ? add fk to snd_file_descriptions? --- ? probably it's not necessary since these entities are --- ? required at different stages of sending files - --- ? replicas on upload, description on notifying client CREATE TABLE snd_file_chunk_replicas ( snd_file_chunk_replica_id INTEGER PRIMARY KEY, snd_file_chunk_id INTEGER NOT NULL REFERENCES snd_file_chunks ON DELETE CASCADE, @@ -51,8 +47,8 @@ CREATE TABLE snd_file_chunk_replicas ( xftp_server_id INTEGER NOT NULL REFERENCES xftp_servers ON DELETE CASCADE, replica_id BLOB NOT NULL, replica_key BLOB NOT NULL, - -- created INTEGER NOT NULL DEFAULT 0, -- as in XFTP create - registered on server - uploaded INTEGER NOT NULL DEFAULT 0, + replica_status TEXT NOT NULL, + delay INTEGER, retries INTEGER NOT NULL DEFAULT 0, created_at TEXT NOT NULL DEFAULT (datetime('now')), updated_at TEXT NOT NULL DEFAULT (datetime('now')) @@ -71,15 +67,21 @@ CREATE TABLE snd_file_chunk_replica_recipients ( ); CREATE INDEX idx_snd_file_chunk_replica_recipients_snd_file_chunk_replica_id ON snd_file_chunk_replica_recipients(snd_file_chunk_replica_id); - -CREATE TABLE snd_file_descriptions ( - snd_file_description_id INTEGER PRIMARY KEY, - snd_file_id INTEGER NOT NULL REFERENCES snd_files ON DELETE CASCADE, - sender INTEGER NOT NULL, -- 1 for sender file description - descr_text TEXT NOT NULL, - created_at TEXT NOT NULL DEFAULT (datetime('now')), - updated_at TEXT NOT NULL DEFAULT (datetime('now')) -); - -CREATE INDEX idx_snd_file_descriptions_snd_file_id ON snd_file_descriptions(snd_file_id); +|] + +down_m20230401_snd_files :: Query +down_m20230401_snd_files = + [sql| +DROP INDEX idx_snd_file_chunk_replica_recipients_snd_file_chunk_replica_id; +DROP TABLE snd_file_chunk_replica_recipients; + +DROP INDEX idx_snd_file_chunk_replicas_snd_file_chunk_id; +DROP INDEX idx_snd_file_chunk_replicas_xftp_server_id; +DROP TABLE snd_file_chunk_replicas; + +DROP INDEX idx_snd_file_chunks_snd_file_id; +DROP TABLE snd_file_chunks; + +DROP INDEX idx_snd_files_user_id; +DROP TABLE snd_files; |] diff --git a/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/agent_schema.sql b/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/agent_schema.sql index ee8cf9ff2..bfef4afd7 100644 --- a/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/agent_schema.sql +++ b/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/agent_schema.sql @@ -345,3 +345,61 @@ CREATE INDEX idx_rcv_file_chunk_replicas_rcv_file_chunk_id ON rcv_file_chunk_rep CREATE INDEX idx_rcv_file_chunk_replicas_xftp_server_id ON rcv_file_chunk_replicas( xftp_server_id ); +CREATE TABLE snd_files( + snd_file_id INTEGER PRIMARY KEY, + snd_file_entity_id BLOB NOT NULL, + user_id INTEGER NOT NULL REFERENCES users ON DELETE CASCADE, + num_recipients INTEGER NOT NULL, + digest BLOB, + key BLOB NOT NUll, + nonce BLOB NOT NUll, + path TEXT NOT NULL, + prefix_path TEXT, + status TEXT NOT NULL, + deleted INTEGER NOT NULL DEFAULT 0, + error TEXT, + created_at TEXT NOT NULL DEFAULT(datetime('now')), + updated_at TEXT NOT NULL DEFAULT(datetime('now')) +); +CREATE INDEX idx_snd_files_user_id ON snd_files(user_id); +CREATE TABLE snd_file_chunks( + snd_file_chunk_id INTEGER PRIMARY KEY, + snd_file_id INTEGER NOT NULL REFERENCES snd_files ON DELETE CASCADE, + chunk_no INTEGER NOT NULL, + chunk_offset INTEGER NOT NULL, + chunk_size INTEGER NOT NULL, + digest BLOB, + created_at TEXT NOT NULL DEFAULT(datetime('now')), + updated_at TEXT NOT NULL DEFAULT(datetime('now')) +); +CREATE INDEX idx_snd_file_chunks_snd_file_id ON snd_file_chunks(snd_file_id); +CREATE TABLE snd_file_chunk_replicas( + snd_file_chunk_replica_id INTEGER PRIMARY KEY, + snd_file_chunk_id INTEGER NOT NULL REFERENCES snd_file_chunks ON DELETE CASCADE, + replica_number INTEGER NOT NULL, + xftp_server_id INTEGER NOT NULL REFERENCES xftp_servers ON DELETE CASCADE, + replica_id BLOB NOT NULL, + replica_key BLOB NOT NULL, + replica_status TEXT NOT NULL, + delay INTEGER, + retries INTEGER NOT NULL DEFAULT 0, + created_at TEXT NOT NULL DEFAULT(datetime('now')), + updated_at TEXT NOT NULL DEFAULT(datetime('now')) +); +CREATE INDEX idx_snd_file_chunk_replicas_snd_file_chunk_id ON snd_file_chunk_replicas( + snd_file_chunk_id +); +CREATE INDEX idx_snd_file_chunk_replicas_xftp_server_id ON snd_file_chunk_replicas( + xftp_server_id +); +CREATE TABLE snd_file_chunk_replica_recipients( + snd_file_chunk_replica_recipient_id INTEGER PRIMARY KEY, + snd_file_chunk_replica_id INTEGER NOT NULL REFERENCES snd_file_chunk_replicas ON DELETE CASCADE, + rcv_replica_id BLOB NOT NULL, + rcv_replica_key BLOB NOT NULL, + created_at TEXT NOT NULL DEFAULT(datetime('now')), + updated_at TEXT NOT NULL DEFAULT(datetime('now')) +); +CREATE INDEX idx_snd_file_chunk_replica_recipients_snd_file_chunk_replica_id ON snd_file_chunk_replica_recipients( + snd_file_chunk_replica_id +);