diff --git a/src/Simplex/FileTransfer/Agent.hs b/src/Simplex/FileTransfer/Agent.hs index 4674c4109..e9a063866 100644 --- a/src/Simplex/FileTransfer/Agent.hs +++ b/src/Simplex/FileTransfer/Agent.hs @@ -19,7 +19,9 @@ module Simplex.FileTransfer.Agent deleteRcvFile, -- Sending files sendFileExperimental, - _sendFile, + sendFile, + deleteSndFileInternal, + deleteSndFileRemote, ) where @@ -33,18 +35,22 @@ import Data.Bifunctor (first) import Data.ByteString (ByteString) import qualified Data.ByteString.Base64.URL as U import qualified Data.ByteString.Char8 as B +import qualified Data.ByteString.Lazy.Char8 as LB import Data.Composition ((.:)) import Data.Int (Int64) -import Data.List (foldl', isSuffixOf, partition) +import Data.List (foldl', isSuffixOf, partition, sortOn) import Data.List.NonEmpty (nonEmpty) import qualified Data.List.NonEmpty as L +import Data.Map (Map) import qualified Data.Map.Strict as M +import Data.Text (Text) import Data.Time.Clock (getCurrentTime) import Data.Time.Format (defaultTimeLocale, formatTime) -import Simplex.FileTransfer.Client.Main (CLIError, SendOptions (..), cliSendFileOpts) +import Simplex.FileTransfer.Client (XFTPChunkSpec (..)) +import Simplex.FileTransfer.Client.Main import Simplex.FileTransfer.Crypto import Simplex.FileTransfer.Description -import Simplex.FileTransfer.Protocol (FileParty (..), FilePartyI) +import Simplex.FileTransfer.Protocol (FileParty (..), FilePartyI, SFileParty (..)) import Simplex.FileTransfer.Transport (XFTPRcvChunkSpec (..)) import Simplex.FileTransfer.Types import Simplex.FileTransfer.Util (removePath, uniqueCombine) @@ -53,10 +59,14 @@ import Simplex.Messaging.Agent.Env.SQLite import Simplex.Messaging.Agent.Protocol import Simplex.Messaging.Agent.RetryInterval import Simplex.Messaging.Agent.Store.SQLite +import qualified Simplex.Messaging.Crypto as C +import qualified Simplex.Messaging.Crypto.Lazy as LC +import Simplex.Messaging.Encoding import Simplex.Messaging.Encoding.String import Simplex.Messaging.Protocol (EntityId, XFTPServer, XFTPServerWithAuth) +import Simplex.Messaging.TMap (TMap) import qualified Simplex.Messaging.TMap as TM -import Simplex.Messaging.Util (liftError, liftIOEither, tshow, whenM) +import Simplex.Messaging.Util (bshow, liftError, liftIOEither, tshow, whenM) import System.FilePath (takeFileName, ()) import UnliftIO import UnliftIO.Concurrent @@ -67,29 +77,42 @@ startWorkers :: AgentMonad m => AgentClient -> Maybe FilePath -> m () startWorkers c workDir = do wd <- asks $ xftpWorkDir . xftpAgent atomically $ writeTVar wd workDir - startFiles + startRcvFiles + startSndFiles + startDelFiles where - startFiles = do - rcvFilesTTL <- asks (rcvFilesTTL . config) + startRcvFiles = do + rcvFilesTTL <- asks $ rcvFilesTTL . config pendingRcvServers <- withStore' c (`getPendingRcvFilesServers` rcvFilesTTL) - forM_ pendingRcvServers $ \s -> addXFTPWorker c (Just s) + forM_ pendingRcvServers $ \s -> addXFTPRcvWorker c (Just s) -- start local worker for files pending decryption, -- no need to make an extra query for the check -- as the worker will check the store anyway - addXFTPWorker c Nothing + addXFTPRcvWorker c Nothing + startSndFiles = do + sndFilesTTL <- asks $ sndFilesTTL . config + -- start worker for files pending encryption/creation + addXFTPSndWorker c Nothing + pendingSndServers <- withStore' c (`getPendingSndFilesServers` sndFilesTTL) + forM_ pendingSndServers $ \s -> addXFTPSndWorker c (Just s) + startDelFiles = do + rcvFilesTTL <- asks $ rcvFilesTTL . config + pendingDelServers <- withStore' c (`getPendingDelFilesServers` rcvFilesTTL) + forM_ pendingDelServers $ addXFTPDelWorker c closeXFTPAgent :: MonadUnliftIO m => XFTPAgent -> m () -closeXFTPAgent XFTPAgent {xftpWorkers} = do - ws <- atomically $ stateTVar xftpWorkers (,M.empty) - mapM_ (uninterruptibleCancel . snd) ws +closeXFTPAgent XFTPAgent {xftpRcvWorkers, xftpSndWorkers} = do + stopWorkers xftpRcvWorkers + stopWorkers xftpSndWorkers + where + stopWorkers wsSel = do + ws <- atomically $ stateTVar wsSel (,M.empty) + mapM_ (uninterruptibleCancel . snd) ws receiveFile :: AgentMonad m => AgentClient -> UserId -> ValidFileDescription 'FRecipient -> m RcvFileId receiveFile c userId (ValidFileDescription fd@FileDescription {chunks}) = do g <- asks idsDrg - workPath <- getXFTPWorkPath - ts <- liftIO getCurrentTime - let isoTime = formatTime defaultTimeLocale "%Y%m%d_%H%M%S_%6q" ts - prefixPath <- uniqueCombine workPath (isoTime <> "_rcv.xftp") + prefixPath <- getPrefixPath "rcv.xftp" createDirectory prefixPath let relPrefixPath = takeFileName prefixPath relTmpPath = relPrefixPath "xftp.encrypted" @@ -102,9 +125,16 @@ receiveFile c userId (ValidFileDescription fd@FileDescription {chunks}) = do where downloadChunk :: AgentMonad m => FileChunk -> m () downloadChunk FileChunk {replicas = (FileChunkReplica {server} : _)} = do - addXFTPWorker c (Just server) + addXFTPRcvWorker c (Just server) downloadChunk _ = throwError $ INTERNAL "no replicas" +getPrefixPath :: AgentMonad m => String -> m FilePath +getPrefixPath suffix = do + workPath <- getXFTPWorkPath + ts <- liftIO getCurrentTime + let isoTime = formatTime defaultTimeLocale "%Y%m%d_%H%M%S_%6q" ts + uniqueCombine workPath (isoTime <> "_" <> suffix) + toFSFilePath :: AgentMonad m => FilePath -> m FilePath toFSFilePath f = ( f) <$> getXFTPWorkPath @@ -113,73 +143,77 @@ createEmptyFile fPath = do h <- openFile fPath AppendMode liftIO $ B.hPut h "" >> hFlush h -addXFTPWorker :: AgentMonad m => AgentClient -> Maybe XFTPServer -> m () -addXFTPWorker c srv_ = do - ws <- asks $ xftpWorkers . xftpAgent +addXFTPRcvWorker :: AgentMonad m => AgentClient -> Maybe XFTPServer -> m () +addXFTPRcvWorker c = addWorker c xftpRcvWorkers runXFTPRcvWorker runXFTPRcvLocalWorker + +addWorker :: + AgentMonad m => + AgentClient -> + (XFTPAgent -> TMap (Maybe XFTPServer) (TMVar (), Async ())) -> + (AgentClient -> XFTPServer -> TMVar () -> m ()) -> + (AgentClient -> TMVar () -> m ()) -> + Maybe XFTPServer -> + m () +addWorker c wsSel runWorker runWorkerNoSrv srv_ = do + ws <- asks $ wsSel . xftpAgent atomically (TM.lookup srv_ ws) >>= \case Nothing -> do doWork <- newTMVarIO () - let runWorker = case srv_ of - Just srv -> runXFTPWorker c srv doWork - Nothing -> runXFTPLocalWorker c doWork - worker <- async $ runWorker `E.finally` atomically (TM.delete srv_ ws) + let runWorker' = case srv_ of + Just srv -> runWorker c srv doWork + Nothing -> runWorkerNoSrv c doWork + worker <- async $ runWorker' `E.finally` atomically (TM.delete srv_ ws) atomically $ TM.insert srv_ (doWork, worker) ws Just (doWork, _) -> void . atomically $ tryPutTMVar doWork () -runXFTPWorker :: forall m. AgentMonad m => AgentClient -> XFTPServer -> TMVar () -> m () -runXFTPWorker c srv doWork = do +runXFTPRcvWorker :: forall m. AgentMonad m => AgentClient -> XFTPServer -> TMVar () -> m () +runXFTPRcvWorker c srv doWork = do forever $ do void . atomically $ readTMVar doWork - agentOperationBracket c AORcvNetwork waitUntilActive runXftpOperation + atomically $ checkAgentForeground c + runXFTPOperation where noWorkToDo = void . atomically $ tryTakeTMVar doWork - runXftpOperation :: m () - runXftpOperation = do + runXFTPOperation :: m () + runXFTPOperation = do rcvFilesTTL <- asks (rcvFilesTTL . config) nextChunk <- withStore' c $ \db -> getNextRcvChunkToDownload db srv rcvFilesTTL case nextChunk of Nothing -> noWorkToDo - Just RcvFileChunk {rcvFileId, rcvFileEntityId, fileTmpPath, replicas = []} -> workerInternalError c rcvFileId rcvFileEntityId (Just fileTmpPath) "chunk has no replicas" - Just fc@RcvFileChunk {userId, rcvFileId, rcvFileEntityId, fileTmpPath, replicas = replica@RcvFileChunkReplica {rcvChunkReplicaId, delay} : _} -> do + Just RcvFileChunk {rcvFileId, rcvFileEntityId, fileTmpPath, replicas = []} -> rcvWorkerInternalError c rcvFileId rcvFileEntityId (Just fileTmpPath) "chunk has no replicas" + Just fc@RcvFileChunk {userId, rcvFileId, rcvFileEntityId, rcvChunkId, fileTmpPath, replicas = replica@RcvFileChunkReplica {rcvChunkReplicaId, server, delay} : _} -> do ri <- asks $ reconnectInterval . config let ri' = maybe ri (\d -> ri {initialInterval = d, increaseAfter = 0}) delay withRetryInterval ri' $ \delay' loop -> downloadFileChunk fc replica - `catchError` retryOnError delay' loop (workerInternalError c rcvFileId rcvFileEntityId (Just fileTmpPath) . show) + `catchError` \e -> retryOnError "XFTP rcv worker" (retryLoop loop e delay') (retryDone e) e where - retryOnError :: Int64 -> m () -> (AgentErrorType -> m ()) -> AgentErrorType -> m () - retryOnError replicaDelay loop done e = do - logError $ "XFTP worker error: " <> tshow e - if temporaryAgentError e - then retryLoop - else done e - where - retryLoop = do - notifyOnRetry <- asks (xftpNotifyErrsOnRetry . config) - when notifyOnRetry $ notifyInternalError c rcvFileEntityId $ show e - closeXFTPServerClient c userId replica - withStore' c $ \db -> updateRcvChunkReplicaDelay db rcvChunkReplicaId replicaDelay - atomically $ endAgentOperation c AORcvNetwork - atomically $ throwWhenInactive c - atomically $ beginAgentOperation c AORcvNetwork - loop + retryLoop loop e replicaDelay = do + flip catchError (\_ -> pure ()) $ do + notifyOnRetry <- asks (xftpNotifyErrsOnRetry . config) + when notifyOnRetry $ notify c rcvFileEntityId $ RFERR e + closeXFTPServerClient c userId server $ bshow rcvChunkId + withStore' c $ \db -> updateRcvChunkReplicaDelay db rcvChunkReplicaId replicaDelay + atomically $ checkAgentForeground c + loop + retryDone e = rcvWorkerInternalError c rcvFileId rcvFileEntityId (Just fileTmpPath) (show e) downloadFileChunk :: RcvFileChunk -> RcvFileChunkReplica -> m () downloadFileChunk RcvFileChunk {userId, rcvFileId, rcvFileEntityId, rcvChunkId, chunkNo, chunkSize, digest, fileTmpPath} replica = do fsFileTmpPath <- toFSFilePath fileTmpPath chunkPath <- uniqueCombine fsFileTmpPath $ show chunkNo let chunkSpec = XFTPRcvChunkSpec chunkPath (unFileSize chunkSize) (unFileDigest digest) relChunkPath = fileTmpPath takeFileName chunkPath - agentXFTPDownloadChunk c userId replica chunkSpec + agentXFTPDownloadChunk c userId rcvChunkId replica chunkSpec (complete, progress) <- withStore c $ \db -> runExceptT $ do - RcvFile {size = FileSize total, chunks} <- - ExceptT $ updateRcvFileChunkReceived db (rcvChunkReplicaId replica) rcvChunkId rcvFileId relChunkPath + liftIO $ updateRcvFileChunkReceived db (rcvChunkReplicaId replica) rcvChunkId relChunkPath + RcvFile {size = FileSize total, chunks} <- ExceptT $ getRcvFile db rcvFileId let rcvd = receivedSize chunks complete = all chunkReceived chunks liftIO . when complete $ updateRcvFileStatus db rcvFileId RFSReceived pure (complete, RFPROG rcvd total) - liftIO $ notify c rcvFileEntityId progress - when complete $ addXFTPWorker c Nothing + notify c rcvFileEntityId progress + when complete $ addXFTPRcvWorker c Nothing where receivedSize :: [RcvFileChunk] -> Int64 receivedSize = foldl' (\sz ch -> sz + receivedChunkSize ch) 0 @@ -188,30 +222,34 @@ runXFTPWorker c srv doWork = do | otherwise = 0 chunkReceived RcvFileChunk {replicas} = any received replicas -workerInternalError :: AgentMonad m => AgentClient -> DBRcvFileId -> RcvFileId -> Maybe FilePath -> String -> m () -workerInternalError c rcvFileId rcvFileEntityId tmpPath internalErrStr = do +retryOnError :: AgentMonad m => Text -> m a -> m a -> AgentErrorType -> m a +retryOnError name loop done e = do + logError $ name <> " error: " <> tshow e + if temporaryAgentError e + then loop + else done + +rcvWorkerInternalError :: AgentMonad m => AgentClient -> DBRcvFileId -> RcvFileId -> Maybe FilePath -> String -> m () +rcvWorkerInternalError c rcvFileId rcvFileEntityId tmpPath internalErrStr = do forM_ tmpPath (removePath <=< toFSFilePath) withStore' c $ \db -> updateRcvFileError db rcvFileId internalErrStr - notifyInternalError c rcvFileEntityId internalErrStr + notify c rcvFileEntityId $ RFERR $ INTERNAL internalErrStr -notifyInternalError :: (MonadUnliftIO m) => AgentClient -> RcvFileId -> String -> m () -notifyInternalError AgentClient {subQ} rcvFileEntityId internalErrStr = atomically $ writeTBQueue subQ ("", rcvFileEntityId, APC SAERcvFile $ RFERR $ INTERNAL internalErrStr) - -runXFTPLocalWorker :: forall m. AgentMonad m => AgentClient -> TMVar () -> m () -runXFTPLocalWorker c doWork = do +runXFTPRcvLocalWorker :: forall m. AgentMonad m => AgentClient -> TMVar () -> m () +runXFTPRcvLocalWorker c doWork = do forever $ do void . atomically $ readTMVar doWork - -- TODO agentOperationBracket? - runXftpOperation + atomically $ checkAgentForeground c + runXFTPOperation where - runXftpOperation :: m () - runXftpOperation = do + runXFTPOperation :: m () + runXFTPOperation = do rcvFilesTTL <- asks (rcvFilesTTL . config) nextFile <- withStore' c (`getNextRcvFileToDecrypt` rcvFilesTTL) case nextFile of Nothing -> noWorkToDo Just f@RcvFile {rcvFileId, rcvFileEntityId, tmpPath} -> - decryptFile f `catchError` (workerInternalError c rcvFileId rcvFileEntityId tmpPath . show) + decryptFile f `catchError` (rcvWorkerInternalError c rcvFileId rcvFileEntityId tmpPath . show) noWorkToDo = void . atomically $ tryTakeTMVar doWork decryptFile :: RcvFile -> m () decryptFile RcvFile {rcvFileId, rcvFileEntityId, key, nonce, tmpPath, savePath, status, chunks} = do @@ -222,9 +260,9 @@ runXFTPLocalWorker c doWork = do chunkPaths <- getChunkPaths chunks encSize <- liftIO $ foldM (\s path -> (s +) . fromIntegral <$> getFileSize path) 0 chunkPaths void $ liftError (INTERNAL . show) $ decryptChunks encSize chunkPaths key nonce $ \_ -> pure fsSavePath + notify c rcvFileEntityId $ RFDONE fsSavePath forM_ tmpPath (removePath <=< toFSFilePath) withStore' c (`updateRcvFileComplete` rcvFileId) - liftIO $ notify c rcvFileEntityId $ RFDONE fsSavePath where getChunkPaths :: [RcvFileChunk] -> m [FilePath] getChunkPaths [] = pure [] @@ -264,7 +302,7 @@ sendFileExperimental c@AgentClient {xftpServers} userId filePath numRecipients = createDirectoryIfMissing False tempPath runSend fileName outputDir tempPath `catchError` \e -> do cleanup outputDir tempPath - liftIO $ notify c sndFileId $ SFERR e + notify c sndFileId $ SFERR e where runSend :: String -> FilePath -> FilePath -> m () runSend fileName outputDir tempPath = do @@ -281,7 +319,7 @@ sendFileExperimental c@AgentClient {xftpServers} userId filePath numRecipients = liftCLI $ cliSendFileOpts sendOptions False $ notify c sndFileId .: SFPROG (sndDescr, rcvDescrs) <- readDescrs outputDir fileName cleanup outputDir tempPath - liftIO $ notify c sndFileId $ SFDONE sndDescr rcvDescrs + notify c sndFileId $ SFDONE sndDescr rcvDescrs cleanup :: FilePath -> FilePath -> m () cleanup outputDir tempPath = do removePath tempPath @@ -299,71 +337,304 @@ sendFileExperimental c@AgentClient {xftpServers} userId filePath numRecipients = readDescr :: FilePartyI p => FilePath -> m (ValidFileDescription p) readDescr f = liftIOEither $ first INTERNAL . strDecode <$> B.readFile f -notify :: forall e. AEntityI e => AgentClient -> EntityId -> ACommand 'Agent e -> IO () +notify :: forall m e. (MonadUnliftIO m, AEntityI e) => AgentClient -> EntityId -> ACommand 'Agent e -> m () notify c entId cmd = atomically $ writeTBQueue (subQ c) ("", entId, APC (sAEntity @e) cmd) --- _sendFile :: AgentMonad m => AgentClient -> UserId -> FilePath -> Int -> m SndFileId -_sendFile :: AgentClient -> UserId -> FilePath -> Int -> m SndFileId -_sendFile _c _userId _filePath _numRecipients = do - -- db: create file in status New without chunks - -- add local snd worker for encryption - -- return file id to client - undefined +sendFile :: AgentMonad m => AgentClient -> UserId -> FilePath -> Int -> m SndFileId +sendFile c userId filePath numRecipients = do + g <- asks idsDrg + prefixPath <- getPrefixPath "snd.xftp" + createDirectory prefixPath + let relPrefixPath = takeFileName prefixPath + key <- liftIO C.randomSbKey + nonce <- liftIO C.randomCbNonce + -- saving absolute filePath will not allow to restore file encryption after app update, but it's a short window + fId <- withStore c $ \db -> createSndFile db g userId numRecipients filePath relPrefixPath key nonce + addXFTPSndWorker c Nothing + pure fId -_runXFTPSndLocalWorker :: forall m. AgentMonad m => AgentClient -> TMVar () -> m () -_runXFTPSndLocalWorker _c doWork = do +addXFTPSndWorker :: AgentMonad m => AgentClient -> Maybe XFTPServer -> m () +addXFTPSndWorker c = addWorker c xftpSndWorkers runXFTPSndWorker runXFTPSndPrepareWorker + +runXFTPSndPrepareWorker :: forall m. AgentMonad m => AgentClient -> TMVar () -> m () +runXFTPSndPrepareWorker c doWork = do forever $ do void . atomically $ readTMVar doWork - runXftpOperation + atomically $ checkAgentForeground c + runXFTPOperation where - runXftpOperation :: m () - runXftpOperation = do - -- db: get next snd file to encrypt (in status New) - -- ? (or Encrypted to retry create? - see below) - -- with fixed retries (?) encryptFile - undefined - _encryptFile :: SndFile -> m () - _encryptFile _sndFile = do - -- if enc path exists, remove it - -- if enc path doesn't exist: - -- - choose enc path - -- - touch file, db: update enc path (?) - -- calculate chunk sizes, encrypt file to enc path - -- calculate digest - -- prepare chunk specs - -- db: - -- - update file status to Encrypted - -- - create chunks according to chunk specs - -- ? since which servers are online is unknown, - -- ? we can't blindly assign servers to replicas. - -- ? should we XFTP create chunks on servers here, - -- ? with retrying for different servers, - -- ? keeping a list of servers that were tried? - -- ? then we can add replicas to chunks in db - -- ? and update file status to Uploading, - -- ? probably in same transaction as creating chunks, - -- ? and add XFTP snd workers for uploading chunks. - undefined + runXFTPOperation :: m () + runXFTPOperation = do + sndFilesTTL <- asks (sndFilesTTL . config) + nextFile <- withStore' c (`getNextSndFileToPrepare` sndFilesTTL) + case nextFile of + Nothing -> noWorkToDo + Just f@SndFile {sndFileId, sndFileEntityId, prefixPath} -> + prepareFile f `catchError` (sndWorkerInternalError c sndFileId sndFileEntityId prefixPath . show) + noWorkToDo = void . atomically $ tryTakeTMVar doWork + prepareFile :: SndFile -> m () + prepareFile SndFile {prefixPath = Nothing} = + throwError $ INTERNAL "no prefix path" + prepareFile sndFile@SndFile {sndFileId, userId, prefixPath = Just ppath, status} = do + SndFile {numRecipients, chunks} <- + if status /= SFSEncrypted -- status is SFSNew or SFSEncrypting + then do + fsEncPath <- toFSFilePath $ sndFileEncPath ppath + when (status == SFSEncrypting) $ + whenM (doesFileExist fsEncPath) $ removeFile fsEncPath + withStore' c $ \db -> updateSndFileStatus db sndFileId SFSEncrypting + (digest, chunkSpecsDigests) <- encryptFileForUpload sndFile fsEncPath + withStore c $ \db -> do + updateSndFileEncrypted db sndFileId digest chunkSpecsDigests + getSndFile db sndFileId + else pure sndFile + maxRecipients <- asks (xftpMaxRecipientsPerRequest . config) + let numRecipients' = min numRecipients maxRecipients + -- concurrently? + forM_ (filter (not . chunkCreated) chunks) $ createChunk numRecipients' + withStore' c $ \db -> updateSndFileStatus db sndFileId SFSUploading + where + encryptFileForUpload :: SndFile -> FilePath -> m (FileDigest, [(XFTPChunkSpec, FileDigest)]) + encryptFileForUpload SndFile {key, nonce, filePath} fsEncPath = do + let fileName = takeFileName filePath + fileSize <- fromInteger <$> getFileSize filePath + when (fileSize > maxFileSize) $ throwError $ INTERNAL "max file size exceeded" + let fileHdr = smpEncode FileHeader {fileName, fileExtra = Nothing} + fileSize' = fromIntegral (B.length fileHdr) + fileSize + chunkSizes = prepareChunkSizes $ fileSize' + fileSizeLen + authTagSize + chunkSizes' = map fromIntegral chunkSizes + encSize = sum chunkSizes' + void $ liftError (INTERNAL . show) $ encryptFile filePath fileHdr key nonce fileSize' encSize fsEncPath + digest <- liftIO $ LC.sha512Hash <$> LB.readFile fsEncPath + let chunkSpecs = prepareChunkSpecs fsEncPath chunkSizes + chunkDigests <- map FileDigest <$> mapM (liftIO . getChunkDigest) chunkSpecs + pure (FileDigest digest, zip chunkSpecs chunkDigests) + chunkCreated :: SndFileChunk -> Bool + chunkCreated SndFileChunk {replicas} = + any (\SndFileChunkReplica {replicaStatus} -> replicaStatus == SFRSCreated) replicas + createChunk :: Int -> SndFileChunk -> m () + createChunk numRecipients' ch = do + atomically $ checkAgentForeground c + (replica, ProtoServerWithAuth srv _) <- agentOperationBracket c AOSndNetwork throwWhenInactive tryCreate + withStore' c $ \db -> createSndFileReplica db ch replica + addXFTPSndWorker c $ Just srv + where + tryCreate = do + ri <- asks $ messageRetryInterval . config + usedSrvs <- newTVarIO ([] :: [XFTPServer]) + withRetryInterval (riFast ri) $ \_ loop -> + createWithNextSrv usedSrvs + `catchError` \e -> retryOnError "XFTP prepare worker" (retryLoop loop) (throwError e) e + where + retryLoop loop = atomically (checkAgentForeground c) >> loop + createWithNextSrv usedSrvs = do + deleted <- withStore' c $ \db -> getSndFileDeleted db sndFileId + when deleted $ throwError $ INTERNAL "file deleted, aborting chunk creation" + withNextSrv c userId usedSrvs [] $ \srvAuth -> do + replica <- agentXFTPNewChunk c ch numRecipients' srvAuth + pure (replica, srvAuth) -_runXFTPSndWorker :: forall m. AgentMonad m => AgentClient -> XFTPServer -> TMVar () -> m () -_runXFTPSndWorker c _srv doWork = do +sndWorkerInternalError :: AgentMonad m => AgentClient -> DBSndFileId -> SndFileId -> Maybe FilePath -> String -> m () +sndWorkerInternalError c sndFileId sndFileEntityId prefixPath internalErrStr = do + forM_ prefixPath $ removePath <=< toFSFilePath + withStore' c $ \db -> updateSndFileError db sndFileId internalErrStr + notify c sndFileEntityId $ SFERR $ INTERNAL internalErrStr + +runXFTPSndWorker :: forall m. AgentMonad m => AgentClient -> XFTPServer -> TMVar () -> m () +runXFTPSndWorker c srv doWork = do forever $ do void . atomically $ readTMVar doWork - agentOperationBracket c AOSndNetwork throwWhenInactive runXftpOperation + atomically $ checkAgentForeground c + agentOperationBracket c AOSndNetwork throwWhenInactive runXFTPOperation where - runXftpOperation :: m () - runXftpOperation = do - -- db: get next snd chunk to upload (replica is not uploaded) - -- with retry interval uploadChunk - -- - with fixed retries, repeat N times: - -- check if other files are in upload, delay (see xftpSndFiles in XFTPAgent) - undefined - _uploadFileChunk :: SndFileChunk -> m () - _uploadFileChunk _sndFileChunk = do - -- add file id to xftpSndFiles - -- XFTP upload chunk - -- db: update replica status to Uploaded, return SndFile - -- if all SndFile's replicas are uploaded: - -- - serialize file descriptions and notify client - -- - remove file id from xftpSndFiles - undefined + noWorkToDo = void . atomically $ tryTakeTMVar doWork + runXFTPOperation :: m () + runXFTPOperation = do + sndFilesTTL <- asks (sndFilesTTL . config) + nextChunk <- withStore' c $ \db -> getNextSndChunkToUpload db srv sndFilesTTL + case nextChunk of + Nothing -> noWorkToDo + Just SndFileChunk {sndFileId, sndFileEntityId, filePrefixPath, replicas = []} -> sndWorkerInternalError c sndFileId sndFileEntityId (Just filePrefixPath) "chunk has no replicas" + Just fc@SndFileChunk {userId, sndFileId, sndChunkId, sndFileEntityId, filePrefixPath, replicas = replica@SndFileChunkReplica {sndChunkReplicaId, server, delay} : _} -> do + ri <- asks $ reconnectInterval . config + let ri' = maybe ri (\d -> ri {initialInterval = d, increaseAfter = 0}) delay + withRetryInterval ri' $ \delay' loop -> + uploadFileChunk fc replica + `catchError` \e -> retryOnError "XFTP snd worker" (retryLoop loop e delay') (retryDone e) e + where + retryLoop loop e replicaDelay = do + flip catchError (\_ -> pure ()) $ do + notifyOnRetry <- asks (xftpNotifyErrsOnRetry . config) + when notifyOnRetry $ notify c sndFileEntityId $ SFERR e + closeXFTPServerClient c userId server $ bshow sndChunkId + withStore' c $ \db -> updateSndChunkReplicaDelay db sndChunkReplicaId replicaDelay + atomically $ checkAgentForeground c + loop + retryDone e = sndWorkerInternalError c sndFileId sndFileEntityId (Just filePrefixPath) (show e) + uploadFileChunk :: SndFileChunk -> SndFileChunkReplica -> m () + uploadFileChunk sndFileChunk@SndFileChunk {sndFileId, sndChunkId, userId, chunkSpec = chunkSpec@XFTPChunkSpec {filePath}} replica = do + replica'@SndFileChunkReplica {sndChunkReplicaId} <- addRecipients sndFileChunk replica + fsFilePath <- toFSFilePath filePath + let chunkSpec' = chunkSpec {filePath = fsFilePath} :: XFTPChunkSpec + atomically $ checkAgentForeground c + agentXFTPUploadChunk c userId sndChunkId replica' chunkSpec' + sf@SndFile {sndFileEntityId, prefixPath, chunks} <- withStore c $ \db -> do + updateSndChunkReplicaStatus db sndChunkReplicaId SFRSUploaded + getSndFile db sndFileId + let uploaded = uploadedSize chunks + total = totalSize chunks + complete = all chunkUploaded chunks + notify c sndFileEntityId $ SFPROG uploaded total + when complete $ do + (sndDescr, rcvDescrs) <- sndFileToDescrs sf + notify c sndFileEntityId $ SFDONE sndDescr rcvDescrs + forM_ prefixPath $ removePath <=< toFSFilePath + withStore' c $ \db -> updateSndFileComplete db sndFileId + where + addRecipients :: SndFileChunk -> SndFileChunkReplica -> m SndFileChunkReplica + addRecipients ch@SndFileChunk {numRecipients} cr@SndFileChunkReplica {rcvIdsKeys} + | length rcvIdsKeys > numRecipients = throwError $ INTERNAL "too many recipients" + | length rcvIdsKeys == numRecipients = pure cr + | otherwise = do + maxRecipients <- asks $ xftpMaxRecipientsPerRequest . config + let numRecipients' = min (numRecipients - length rcvIdsKeys) maxRecipients + rcvIdsKeys' <- agentXFTPAddRecipients c userId sndChunkId cr numRecipients' + cr' <- withStore' c $ \db -> addSndChunkReplicaRecipients db cr $ L.toList rcvIdsKeys' + addRecipients ch cr' + sndFileToDescrs :: SndFile -> m (ValidFileDescription 'FSender, [ValidFileDescription 'FRecipient]) + sndFileToDescrs SndFile {digest = Nothing} = throwError $ INTERNAL "snd file has no digest" + sndFileToDescrs SndFile {chunks = []} = throwError $ INTERNAL "snd file has no chunks" + sndFileToDescrs SndFile {digest = Just digest, key, nonce, chunks = chunks@(fstChunk : _)} = do + let chunkSize = FileSize $ sndChunkSize fstChunk + size = FileSize $ sum $ map (fromIntegral . sndChunkSize) chunks + -- snd description + sndDescrChunks <- mapM toSndDescrChunk chunks + let fdSnd = FileDescription {party = SFSender, size, digest, key, nonce, chunkSize, chunks = sndDescrChunks} + validFdSnd <- either (throwError . INTERNAL) pure $ validateFileDescription fdSnd + -- rcv descriptions + let fdRcv = FileDescription {party = SFRecipient, size, digest, key, nonce, chunkSize, chunks = []} + fdRcvs = createRcvFileDescriptions fdRcv chunks + validFdRcvs <- either (throwError . INTERNAL) pure $ mapM validateFileDescription fdRcvs + pure (validFdSnd, validFdRcvs) + toSndDescrChunk :: SndFileChunk -> m FileChunk + toSndDescrChunk SndFileChunk {replicas = []} = throwError $ INTERNAL "snd file chunk has no replicas" + toSndDescrChunk ch@SndFileChunk {chunkNo, digest = chDigest, replicas = (SndFileChunkReplica {server, replicaId, replicaKey} : _)} = do + let chunkSize = FileSize $ sndChunkSize ch + replicas = [FileChunkReplica {server, replicaId, replicaKey}] + pure FileChunk {chunkNo, digest = chDigest, chunkSize, replicas} + createRcvFileDescriptions :: FileDescription 'FRecipient -> [SndFileChunk] -> [FileDescription 'FRecipient] + createRcvFileDescriptions fd sndChunks = map (\chunks -> (fd :: (FileDescription 'FRecipient)) {chunks}) rcvChunks + where + rcvReplicas :: [SentRecipientReplica] + rcvReplicas = concatMap toSentRecipientReplicas sndChunks + toSentRecipientReplicas :: SndFileChunk -> [SentRecipientReplica] + toSentRecipientReplicas ch@SndFileChunk {chunkNo, digest, replicas} = + let chunkSize = FileSize $ sndChunkSize ch + in concatMap + ( \SndFileChunkReplica {server, rcvIdsKeys} -> + zipWith + (\rcvNo (replicaId, replicaKey) -> SentRecipientReplica {chunkNo, server, rcvNo, replicaId, replicaKey, digest, chunkSize}) + [1 ..] + rcvIdsKeys + ) + replicas + rcvChunks :: [[FileChunk]] + rcvChunks = map (sortChunks . M.elems) $ M.elems $ foldl' addRcvChunk M.empty rcvReplicas + sortChunks :: [FileChunk] -> [FileChunk] + sortChunks = map reverseReplicas . sortOn (chunkNo :: FileChunk -> Int) + reverseReplicas ch@FileChunk {replicas} = (ch :: FileChunk) {replicas = reverse replicas} + addRcvChunk :: Map Int (Map Int FileChunk) -> SentRecipientReplica -> Map Int (Map Int FileChunk) + addRcvChunk m SentRecipientReplica {chunkNo, server, rcvNo, replicaId, replicaKey, digest, chunkSize} = + M.alter (Just . addOrChangeRecipient) rcvNo m + where + addOrChangeRecipient :: Maybe (Map Int FileChunk) -> Map Int FileChunk + addOrChangeRecipient = \case + Just m' -> M.alter (Just . addOrChangeChunk) chunkNo m' + _ -> M.singleton chunkNo $ FileChunk {chunkNo, digest, chunkSize, replicas = [replica']} + addOrChangeChunk :: Maybe FileChunk -> FileChunk + addOrChangeChunk = \case + Just ch@FileChunk {replicas} -> ch {replicas = replica' : replicas} + _ -> FileChunk {chunkNo, digest, chunkSize, replicas = [replica']} + replica' = FileChunkReplica {server, replicaId, replicaKey} + uploadedSize :: [SndFileChunk] -> Int64 + uploadedSize = foldl' (\sz ch -> sz + uploadedChunkSize ch) 0 + uploadedChunkSize ch + | chunkUploaded ch = fromIntegral (sndChunkSize ch) + | otherwise = 0 + totalSize :: [SndFileChunk] -> Int64 + totalSize = foldl' (\sz ch -> sz + fromIntegral (sndChunkSize ch)) 0 + chunkUploaded :: SndFileChunk -> Bool + chunkUploaded SndFileChunk {replicas} = + any (\SndFileChunkReplica {replicaStatus} -> replicaStatus == SFRSUploaded) replicas + +deleteSndFileInternal :: AgentMonad m => AgentClient -> UserId -> SndFileId -> m () +deleteSndFileInternal c userId sndFileEntityId = do + SndFile {sndFileId, prefixPath, status} <- withStore c $ \db -> getSndFileByEntityId db userId sndFileEntityId + if status == SFSComplete || status == SFSError + then do + forM_ prefixPath $ removePath <=< toFSFilePath + withStore' c (`deleteSndFile'` sndFileId) + else withStore' c (`updateSndFileDeleted` sndFileId) + +deleteSndFileRemote :: forall m. AgentMonad m => AgentClient -> UserId -> SndFileId -> ValidFileDescription 'FSender -> m () +deleteSndFileRemote c userId sndFileEntityId (ValidFileDescription FileDescription {chunks}) = do + deleteSndFileInternal c userId sndFileEntityId `catchError` (notify c sndFileEntityId . SFERR) + forM_ chunks $ \ch -> deleteFileChunk ch `catchError` (notify c sndFileEntityId . SFERR) + where + deleteFileChunk :: FileChunk -> m () + deleteFileChunk FileChunk {replicas = replica@FileChunkReplica {server} : _} = do + withStore' c $ \db -> createDeletedSndChunkReplica db userId replica + addXFTPDelWorker c server + deleteFileChunk _ = pure () + +addXFTPDelWorker :: AgentMonad m => AgentClient -> XFTPServer -> m () +addXFTPDelWorker c srv = do + ws <- asks $ xftpDelWorkers . xftpAgent + atomically (TM.lookup srv ws) >>= \case + Nothing -> do + doWork <- newTMVarIO () + worker <- async $ runXFTPDelWorker c srv doWork `E.finally` atomically (TM.delete srv ws) + atomically $ TM.insert srv (doWork, worker) ws + Just (doWork, _) -> + void . atomically $ tryPutTMVar doWork () + +runXFTPDelWorker :: forall m. AgentMonad m => AgentClient -> XFTPServer -> TMVar () -> m () +runXFTPDelWorker c srv doWork = do + forever $ do + void . atomically $ readTMVar doWork + atomically $ checkAgentForeground c + runXFTPOperation + where + noWorkToDo = void . atomically $ tryTakeTMVar doWork + runXFTPOperation :: m () + runXFTPOperation = do + -- no point in deleting files older than rcv ttl, as they will be expired on server + rcvFilesTTL <- asks (rcvFilesTTL . config) + nextReplica <- withStore' c $ \db -> getNextDeletedSndChunkReplica db srv rcvFilesTTL + case nextReplica of + Nothing -> noWorkToDo + Just replica@DeletedSndChunkReplica {deletedSndChunkReplicaId, userId, server, replicaId = ChunkReplicaId replId, delay} -> do + ri <- asks $ reconnectInterval . config + let ri' = maybe ri (\d -> ri {initialInterval = d, increaseAfter = 0}) delay + withRetryInterval ri' $ \delay' loop -> + deleteChunkReplica replica + `catchError` \e -> retryOnError "XFTP del worker" (retryLoop loop e delay') (retryDone e) e + where + retryLoop loop e replicaDelay = do + flip catchError (\_ -> pure ()) $ do + notifyOnRetry <- asks (xftpNotifyErrsOnRetry . config) + when notifyOnRetry $ notify c "" $ SFERR e + closeXFTPServerClient c userId server replId + withStore' c $ \db -> updateDeletedSndChunkReplicaDelay db deletedSndChunkReplicaId replicaDelay + atomically $ checkAgentForeground c + loop + retryDone e = delWorkerInternalError c deletedSndChunkReplicaId e + deleteChunkReplica :: DeletedSndChunkReplica -> m () + deleteChunkReplica replica@DeletedSndChunkReplica {userId, deletedSndChunkReplicaId} = do + agentXFTPDeleteChunk c userId replica + withStore' c $ \db -> deleteDeletedSndChunkReplica db deletedSndChunkReplicaId + +delWorkerInternalError :: AgentMonad m => AgentClient -> Int64 -> AgentErrorType -> m () +delWorkerInternalError c deletedSndChunkReplicaId e = do + withStore' c $ \db -> deleteDeletedSndChunkReplica db deletedSndChunkReplicaId + notify c "" $ SFERR e diff --git a/src/Simplex/FileTransfer/Client.hs b/src/Simplex/FileTransfer/Client.hs index 4e0499f41..fa93776d4 100644 --- a/src/Simplex/FileTransfer/Client.hs +++ b/src/Simplex/FileTransfer/Client.hs @@ -215,7 +215,5 @@ noFile HTTP2Body {bodyPart} a = case bodyPart of Just _ -> pure a -- throwError $ PCEResponseError HAS_FILE _ -> pure a --- FADD :: NonEmpty RcvPublicVerifyKey -> FileCommand Sender --- FDEL :: FileCommand Sender -- FACK :: FileCommand Recipient -- PING :: FileCommand Recipient diff --git a/src/Simplex/FileTransfer/Client/Main.hs b/src/Simplex/FileTransfer/Client/Main.hs index 3b9a838de..badf4e7c0 100644 --- a/src/Simplex/FileTransfer/Client/Main.hs +++ b/src/Simplex/FileTransfer/Client/Main.hs @@ -16,6 +16,14 @@ module Simplex.FileTransfer.Client.Main cliSendFile, cliSendFileOpts, prepareChunkSizes, + prepareChunkSpecs, + chunkSize1, + chunkSize2, + chunkSize3, + maxFileSize, + fileSizeLen, + getChunkDigest, + SentRecipientReplica (..), ) where @@ -26,6 +34,7 @@ import Control.Monad.Except import Crypto.Random (getRandomBytes) import qualified Data.Attoparsec.ByteString.Char8 as A import Data.Bifunctor (first) +import Data.ByteString.Char8 (ByteString) import qualified Data.ByteString.Char8 as B import qualified Data.ByteString.Lazy.Char8 as LB import Data.Char (toLower) @@ -55,7 +64,7 @@ import qualified Simplex.Messaging.Crypto.Lazy as LC import Simplex.Messaging.Encoding import Simplex.Messaging.Encoding.String (StrEncoding (..)) import Simplex.Messaging.Parsers (parseAll) -import Simplex.Messaging.Protocol (ProtoServerWithAuth (..), SenderId, SndPrivateSignKey, SndPublicVerifyKey, XFTPServer, XFTPServerWithAuth) +import Simplex.Messaging.Protocol (ProtoServerWithAuth (..), SenderId, SndPrivateSignKey, XFTPServer, XFTPServerWithAuth) import Simplex.Messaging.Server.CLI (getCliCommand') import Simplex.Messaging.Util (ifM, tshow, whenM) import System.Exit (exitFailure) @@ -319,7 +328,8 @@ cliSendFileOpts SendOptions {filePath, outputDir, numRecipients, xftpServers, re logInfo $ "uploading chunk " <> tshow chunkNo <> " to " <> showServer xftpServer <> "..." (sndKey, spKey) <- liftIO $ C.generateSignatureKeyPair C.SEd25519 rKeys <- liftIO $ L.fromList <$> replicateM numRecipients (C.generateSignatureKeyPair C.SEd25519) - ch@FileInfo {digest} <- liftIO $ getChunkInfo sndKey chunkSpec + digest <- liftIO $ getChunkDigest chunkSpec + let ch = FileInfo {sndKey, size = fromIntegral chunkSize, digest} c <- withRetry retryCount $ getXFTPServerClient a xftpServer (sndId, rIds) <- withRetry retryCount $ createXFTPChunk c spKey ch (L.map fst rKeys) auth withReconnect a xftpServer retryCount $ \c' -> uploadXFTPChunk c' spKey sndId chunkSpec @@ -332,12 +342,6 @@ cliSendFileOpts SendOptions {filePath, outputDir, numRecipients, xftpServers, re let recipients = L.toList $ L.map ChunkReplicaId rIds `L.zip` L.map snd rKeys replicas = [SentFileChunkReplica {server = xftpServer, recipients}] pure (chunkNo, SentFileChunk {chunkNo, sndId, sndPrivateKey = spKey, chunkSize = FileSize $ fromIntegral chunkSize, digest = FileDigest digest, replicas}) - getChunkInfo :: SndPublicVerifyKey -> XFTPChunkSpec -> IO FileInfo - getChunkInfo sndKey XFTPChunkSpec {filePath = chunkPath, chunkOffset, chunkSize} = - withFile chunkPath ReadMode $ \h -> do - hSeek h AbsoluteSeek $ fromIntegral chunkOffset - digest <- LC.sha256Hash <$> LB.hGet h (fromIntegral chunkSize) - pure FileInfo {sndKey, size = fromIntegral chunkSize, digest} getXFTPServer :: TVar StdGen -> NonEmpty XFTPServerWithAuth -> IO XFTPServerWithAuth getXFTPServer gen = \case srv :| [] -> pure srv @@ -406,6 +410,12 @@ cliSendFileOpts SendOptions {filePath, outputDir, numRecipients, xftpServers, re B.writeFile fdSndPath $ strEncode fdSnd pure (fdRcvPaths, fdSndPath) +getChunkDigest :: XFTPChunkSpec -> IO ByteString +getChunkDigest XFTPChunkSpec {filePath = chunkPath, chunkOffset, chunkSize} = + withFile chunkPath ReadMode $ \h -> do + hSeek h AbsoluteSeek $ fromIntegral chunkOffset + LC.sha256Hash <$> LB.hGet h (fromIntegral chunkSize) + cliReceiveFile :: ReceiveOptions -> ExceptT CLIError IO () cliReceiveFile ReceiveOptions {fileDescription, filePath, retryCount, tempPath, verbose, yes} = getFileDescription' fileDescription >>= receiveFile diff --git a/src/Simplex/FileTransfer/Description.hs b/src/Simplex/FileTransfer/Description.hs index c995f0dcd..316070f7f 100644 --- a/src/Simplex/FileTransfer/Description.hs +++ b/src/Simplex/FileTransfer/Description.hs @@ -4,7 +4,6 @@ {-# LANGUAGE DuplicateRecordFields #-} {-# LANGUAGE GADTs #-} {-# LANGUAGE KindSignatures #-} -{-# LANGUAGE LambdaCase #-} {-# LANGUAGE NamedFieldPuns #-} {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE PatternSynonyms #-} @@ -182,7 +181,7 @@ instance FilePartyI p => StrEncoding (ValidFileDescription p) where instance StrEncoding AValidFileDescription where strEncode (AVFD fd) = strEncode fd - strDecode = validateFileDescription <=< strDecode + strDecode = (\(AFD fd) -> AVFD <$> validateFileDescription fd) <=< strDecode strP = strDecode <$?> A.takeByteString instance FilePartyI p => StrEncoding (FileDescription p) where @@ -195,15 +194,14 @@ instance StrEncoding AFileDescription where strDecode = decodeFileDescription <=< first show . Y.decodeEither' strP = strDecode <$?> A.takeByteString -validateFileDescription :: AFileDescription -> Either String AValidFileDescription -validateFileDescription = \case - AFD fd@FileDescription {size, chunks} - | chunkNos /= [1 .. length chunks] -> Left "chunk numbers are not sequential" - | chunksSize chunks /= unFileSize size -> Left "chunks total size is different than file size" - | otherwise -> Right $ AVFD (ValidFD fd) - where - chunkNos = map (chunkNo :: FileChunk -> Int) chunks - chunksSize = fromIntegral . foldl' (\s FileChunk {chunkSize} -> s + unFileSize chunkSize) 0 +validateFileDescription :: FileDescription p -> Either String (ValidFileDescription p) +validateFileDescription fd@FileDescription {size, chunks} + | chunkNos /= [1 .. length chunks] = Left "chunk numbers are not sequential" + | chunksSize chunks /= unFileSize size = Left "chunks total size is different than file size" + | otherwise = Right $ ValidFD fd + where + chunkNos = map (chunkNo :: FileChunk -> Int) chunks + chunksSize = fromIntegral . foldl' (\s FileChunk {chunkSize} -> s + unFileSize chunkSize) 0 encodeFileDescription :: FileDescription p -> YAMLFileDescription encodeFileDescription FileDescription {party, size, digest, key, nonce, chunkSize, chunks} = diff --git a/src/Simplex/FileTransfer/Types.hs b/src/Simplex/FileTransfer/Types.hs index b70d9c1c7..04d471a7c 100644 --- a/src/Simplex/FileTransfer/Types.hs +++ b/src/Simplex/FileTransfer/Types.hs @@ -11,12 +11,13 @@ import Database.SQLite.Simple.FromField (FromField (..)) import Database.SQLite.Simple.ToField (ToField (..)) import Simplex.FileTransfer.Client (XFTPChunkSpec (..)) import Simplex.FileTransfer.Description -import Simplex.Messaging.Agent.Protocol (RcvFileId) +import Simplex.Messaging.Agent.Protocol (RcvFileId, SndFileId) import qualified Simplex.Messaging.Crypto as C import Simplex.Messaging.Encoding import Simplex.Messaging.Encoding.String import Simplex.Messaging.Parsers (fromTextField_) import Simplex.Messaging.Protocol +import System.FilePath (()) authTagSize :: Int64 authTagSize = fromIntegral C.authTagSize @@ -111,26 +112,31 @@ data RcvFileChunkReplica = RcvFileChunkReplica type DBSndFileId = Int64 data SndFile = SndFile - { userId :: Int64, - sndFileId :: DBSndFileId, - size :: FileSize Int64, - digest :: FileDigest, + { sndFileId :: DBSndFileId, + sndFileEntityId :: SndFileId, + userId :: Int64, + numRecipients :: Int, + digest :: Maybe FileDigest, key :: C.SbKey, nonce :: C.CbNonce, - chunkSize :: FileSize Word32, - chunks :: [RcvFileChunk], - path :: FilePath, - encPath :: Maybe FilePath, - status :: SndFileStatus + chunks :: [SndFileChunk], + filePath :: FilePath, + prefixPath :: Maybe FilePath, + status :: SndFileStatus, + deleted :: Bool } deriving (Eq, Show) +sndFileEncPath :: FilePath -> FilePath +sndFileEncPath prefixPath = prefixPath "xftp.encrypted" + data SndFileStatus - = SFSNew - | SFSEncrypting - | SFSEncrypted - | SFSUploading - | SFSComplete + = SFSNew -- db record created + | SFSEncrypting -- encryption started + | SFSEncrypted -- encryption complete + | SFSUploading -- all chunk replicas are created on servers + | SFSComplete -- all chunk replicas are uploaded + | SFSError -- permanent error deriving (Eq, Show) instance FromField SndFileStatus where fromField = fromTextField_ textDecode @@ -144,6 +150,7 @@ instance TextEncoding SndFileStatus where "encrypted" -> Just SFSEncrypted "uploading" -> Just SFSUploading "complete" -> Just SFSComplete + "error" -> Just SFSError _ -> Nothing textEncode = \case SFSNew -> "new" @@ -151,16 +158,30 @@ instance TextEncoding SndFileStatus where SFSEncrypted -> "encrypted" SFSUploading -> "uploading" SFSComplete -> "complete" + SFSError -> "error" data SndFileChunk = SndFileChunk - { userId :: Int64, - sndFileId :: DBSndFileId, + { sndFileId :: DBSndFileId, + sndFileEntityId :: SndFileId, + userId :: Int64, + numRecipients :: Int, sndChunkId :: Int64, chunkNo :: Int, chunkSpec :: XFTPChunkSpec, + filePrefixPath :: FilePath, digest :: FileDigest, - replicas :: [SndFileChunkReplica], - delay :: Maybe Int + replicas :: [SndFileChunkReplica] + } + deriving (Eq, Show) + +sndChunkSize :: SndFileChunk -> Word32 +sndChunkSize SndFileChunk {chunkSpec = XFTPChunkSpec {chunkSize}} = chunkSize + +data NewSndChunkReplica = NewSndChunkReplica + { server :: XFTPServer, + replicaId :: ChunkReplicaId, + replicaKey :: C.APrivateSignKey, + rcvIdsKeys :: [(ChunkReplicaId, C.APrivateSignKey)] } deriving (Eq, Show) @@ -170,15 +191,37 @@ data SndFileChunkReplica = SndFileChunkReplica replicaId :: ChunkReplicaId, replicaKey :: C.APrivateSignKey, rcvIdsKeys :: [(ChunkReplicaId, C.APrivateSignKey)], - -- created :: Bool, - uploaded :: Bool, + replicaStatus :: SndFileReplicaStatus, + delay :: Maybe Int64, retries :: Int } deriving (Eq, Show) --- to be used in reply to client -data SndFileDescription = SndFileDescription - { description :: String, - sender :: Bool +data SndFileReplicaStatus + = SFRSCreated + | SFRSUploaded + deriving (Eq, Show) + +instance FromField SndFileReplicaStatus where fromField = fromTextField_ textDecode + +instance ToField SndFileReplicaStatus where toField = toField . textEncode + +instance TextEncoding SndFileReplicaStatus where + textDecode = \case + "created" -> Just SFRSCreated + "uploaded" -> Just SFRSUploaded + _ -> Nothing + textEncode = \case + SFRSCreated -> "created" + SFRSUploaded -> "uploaded" + +data DeletedSndChunkReplica = DeletedSndChunkReplica + { deletedSndChunkReplicaId :: Int64, + userId :: Int64, + server :: XFTPServer, + replicaId :: ChunkReplicaId, + replicaKey :: C.APrivateSignKey, + delay :: Maybe Int64, + retries :: Int } deriving (Eq, Show) diff --git a/src/Simplex/Messaging/Agent.hs b/src/Simplex/Messaging/Agent.hs index 785e76d08..2c6a14913 100644 --- a/src/Simplex/Messaging/Agent.hs +++ b/src/Simplex/Messaging/Agent.hs @@ -84,7 +84,9 @@ module Simplex.Messaging.Agent xftpReceiveFile, xftpDeleteRcvFile, xftpSendFile, - activateAgent, + xftpDeleteSndFileInternal, + xftpDeleteSndFileRemote, + foregroundAgent, suspendAgent, execAgentStoreSQL, getAgentMigrations, @@ -107,7 +109,7 @@ import qualified Data.ByteString.Char8 as B import Data.Composition ((.:), (.:.), (.::)) import Data.Foldable (foldl') import Data.Functor (($>)) -import Data.List (deleteFirstsBy, find, (\\)) +import Data.List (find) import Data.List.NonEmpty (NonEmpty (..), (<|)) import qualified Data.List.NonEmpty as L import Data.Map.Strict (Map) @@ -118,7 +120,7 @@ import qualified Data.Text as T import Data.Time.Clock import Data.Time.Clock.System (systemToUTCTime) import qualified Database.SQLite.Simple as DB -import Simplex.FileTransfer.Agent (closeXFTPAgent, deleteRcvFile, receiveFile, sendFileExperimental, startWorkers, toFSFilePath) +import Simplex.FileTransfer.Agent (closeXFTPAgent, deleteRcvFile, deleteSndFileInternal, deleteSndFileRemote, receiveFile, sendFile, startWorkers, toFSFilePath) import Simplex.FileTransfer.Description (ValidFileDescription) import Simplex.FileTransfer.Protocol (FileParty (..)) import Simplex.FileTransfer.Util (removePath) @@ -140,12 +142,11 @@ import Simplex.Messaging.Notifications.Protocol (DeviceToken, NtfRegCode (NtfReg import Simplex.Messaging.Notifications.Server.Push.APNS (PNMessageData (..)) import Simplex.Messaging.Notifications.Types import Simplex.Messaging.Parsers (parse) -import Simplex.Messaging.Protocol (BrokerMsg, EntityId, ErrorType (AUTH), MsgBody, MsgFlags, NtfServer, ProtoServerWithAuth, ProtocolTypeI (..), SMPMsgMeta, SProtocolType (..), SndPublicVerifyKey, UserProtocol, XFTPServerWithAuth, protoServer, sameSrvAddr') +import Simplex.Messaging.Protocol (BrokerMsg, EntityId, ErrorType (AUTH), MsgBody, MsgFlags, NtfServer, ProtoServerWithAuth, ProtocolTypeI (..), SMPMsgMeta, SProtocolType (..), SndPublicVerifyKey, UserProtocol, XFTPServerWithAuth) import qualified Simplex.Messaging.Protocol as SMP import qualified Simplex.Messaging.TMap as TM import Simplex.Messaging.Util import Simplex.Messaging.Version -import System.Random (randomR) import UnliftIO.Async (async, race_) import UnliftIO.Concurrent (forkFinally, forkIO, threadDelay) import qualified UnliftIO.Exception as E @@ -349,11 +350,19 @@ xftpDeleteRcvFile c = withAgentEnv c .: deleteRcvFile c -- | Send XFTP file xftpSendFile :: AgentErrorMonad m => AgentClient -> UserId -> FilePath -> Int -> m SndFileId -xftpSendFile c = withAgentEnv c .:. sendFileExperimental c +xftpSendFile c = withAgentEnv c .:. sendFile c + +-- | Delete XFTP snd file internally (deletes work files from file system and db records) +xftpDeleteSndFileInternal :: AgentErrorMonad m => AgentClient -> UserId -> SndFileId -> m () +xftpDeleteSndFileInternal c = withAgentEnv c .: deleteSndFileInternal c + +-- | Delete XFTP snd file chunks on servers +xftpDeleteSndFileRemote :: AgentErrorMonad m => AgentClient -> UserId -> SndFileId -> ValidFileDescription 'FSender -> m () +xftpDeleteSndFileRemote c = withAgentEnv c .:. deleteSndFileRemote c -- | Activate operations -activateAgent :: MonadUnliftIO m => AgentClient -> m () -activateAgent c = withAgentEnv c $ activateAgent' c +foregroundAgent :: MonadUnliftIO m => AgentClient -> m () +foregroundAgent c = withAgentEnv c $ foregroundAgent' c -- | Suspend operations with max delay to deliver pending messages suspendAgent :: MonadUnliftIO m => AgentClient -> Int -> m () @@ -551,7 +560,7 @@ joinConn :: AgentMonad m => AgentClient -> UserId -> ConnId -> Bool -> Bool -> C joinConn c userId connId asyncMode enableNtfs cReq cInfo = do srv <- case cReq of CRInvitationUri ConnReqUriData {crSmpQueues = q :| _} _ -> - getNextSMPServer c userId [qServer q] + getNextServer c userId [qServer q] _ -> getSMPServer c userId joinConnSrv c userId connId asyncMode enableNtfs cReq cInfo srv @@ -847,13 +856,13 @@ runCommandProcessing c@AgentClient {subQ} server_ = do AClientCommand (APC _ cmd) -> case cmd of NEW enableNtfs (ACM cMode) -> noServer $ do usedSrvs <- newTVarIO ([] :: [SMPServer]) - tryCommand . withNextSrv usedSrvs [] $ \srv -> do + tryCommand . withNextSrv c userId usedSrvs [] $ \srv -> do (_, cReq) <- newRcvConnSrv c userId connId enableNtfs cMode Nothing srv notify $ INV (ACR cMode cReq) JOIN enableNtfs (ACR _ cReq@(CRInvitationUri ConnReqUriData {crSmpQueues = q :| _} _)) connInfo -> noServer $ do let initUsed = [qServer q] usedSrvs <- newTVarIO initUsed - tryCommand . withNextSrv usedSrvs initUsed $ \srv -> do + tryCommand . withNextSrv c userId usedSrvs initUsed $ \srv -> do void $ joinConnSrv c userId connId True enableNtfs cReq connInfo srv notify OK LET confId ownCInfo -> withServer' . tryCommand $ allowConnection' c connId confId ownCInfo >> notify OK @@ -933,16 +942,6 @@ runCommandProcessing c@AgentClient {subQ} server_ = do cmdError e = notify (ERR e) >> withStore' c (`deleteCommand` cmdId) notify :: forall e. AEntityI e => ACommand 'Agent e -> m () notify cmd = atomically $ writeTBQueue subQ (corrId, connId, APC (sAEntity @e) cmd) - withNextSrv :: TVar [SMPServer] -> [SMPServer] -> (SMPServerWithAuth -> m ()) -> m () - withNextSrv usedSrvs initUsed action = do - used <- readTVarIO usedSrvs - srvAuth@(ProtoServerWithAuth srv _) <- getNextSMPServer c userId used - atomically $ do - srvs_ <- TM.lookup userId $ smpServers c - let unused = maybe [] ((\\ used) . map protoServer . L.toList) srvs_ - used' = if null unused then initUsed else srv : used - writeTVar usedSrvs $! used' - action srvAuth -- ^ ^ ^ async command processing / enqueueMessages :: AgentMonad m => AgentClient -> ConnData -> NonEmpty SndQueue -> MsgFlags -> AMessage -> m AgentMsgId @@ -1023,7 +1022,7 @@ runSmpQueueMsgDelivery c@AgentClient {subQ} cData@ConnData {userId, connId, dupl atomically $ throwWhenNoDelivery c sq msgId <- atomically $ readTQueue mq atomically $ beginAgentOperation c AOSndNetwork - atomically $ endAgentOperation c AOMsgDelivery + atomically $ endAgentOperation c AOMsgDelivery -- this operation begins in queuePendingMsgs let mId = unId msgId E.try (withStore c $ \db -> getPendingMsgData db connId msgId) >>= \case Left (e :: E.SomeException) -> @@ -1185,8 +1184,8 @@ switchConnection' c connId = withConnLock c connId "switchConnection" $ do DuplexConnection cData@ConnData {userId} rqs@(rq@RcvQueue {server, dbQueueId, sndId} :| rqs_) sqs -> do clientVRange <- asks $ smpClientVRange . config -- try to get the server that is different from all queues, or at least from the primary rcv queue - srvAuth@(ProtoServerWithAuth srv _) <- getNextSMPServer c userId $ map qServer (L.toList rqs) <> map qServer (L.toList sqs) - srv' <- if srv == server then getNextSMPServer c userId [server] else pure srvAuth + srvAuth@(ProtoServerWithAuth srv _) <- getNextServer c userId $ map qServer (L.toList rqs) <> map qServer (L.toList sqs) + srv' <- if srv == server then getNextServer c userId [server] else pure srvAuth (q, qUri) <- newRcvQueue c userId connId srv' clientVRange let rq' = (q :: RcvQueue) {primary = True, dbReplaceQueueId = Just dbQueueId} void . withStore c $ \db -> addConnRcvQueue db connId rq' @@ -1340,11 +1339,7 @@ connectionStats = \case -- | Change servers to be used for creating new queues, in Reader monad setProtocolServers' :: forall p m. (ProtocolTypeI p, UserProtocol p, AgentMonad m) => AgentClient -> UserId -> NonEmpty (ProtoServerWithAuth p) -> m () -setProtocolServers' c userId srvs = servers >>= atomically . TM.insert userId srvs - where - servers = case protocolTypeI @p of - SPSMP -> pure $ smpServers c - SPXFTP -> pure $ xftpServers c +setProtocolServers' c userId srvs = atomically $ TM.insert userId srvs (userServers c) registerNtfToken' :: forall m. AgentMonad m => AgentClient -> DeviceToken -> NotificationsMode -> m NtfTknStatus registerNtfToken' c suppliedDeviceToken suppliedNtfMode = @@ -1543,9 +1538,9 @@ sendNtfConnCommands c cmd = do setNtfServers' :: AgentMonad' m => AgentClient -> [NtfServer] -> m () setNtfServers' c = atomically . writeTVar (ntfServers c) -activateAgent' :: AgentMonad' m => AgentClient -> m () -activateAgent' c = do - atomically $ writeTVar (agentState c) ASActive +foregroundAgent' :: AgentMonad' m => AgentClient -> m () +foregroundAgent' c = do + atomically $ writeTVar (agentState c) ASForeground mapM_ activate $ reverse agentOperations where activate opSel = atomically $ modifyTVar' (opSel c) $ \s -> s {opSuspended = False} @@ -1590,25 +1585,6 @@ debugAgentLocks' AgentClient {connLocks = cs, reconnectLocks = rs, deleteLock = getSMPServer :: AgentMonad m => AgentClient -> UserId -> m SMPServerWithAuth getSMPServer c userId = withUserServers c userId pickServer -pickServer :: AgentMonad' m => NonEmpty SMPServerWithAuth -> m SMPServerWithAuth -pickServer = \case - srv :| [] -> pure srv - servers -> do - gen <- asks randomServer - atomically $ (servers L.!!) <$> stateTVar gen (randomR (0, L.length servers - 1)) - -getNextSMPServer :: AgentMonad m => AgentClient -> UserId -> [SMPServer] -> m SMPServerWithAuth -getNextSMPServer c userId usedSrvs = withUserServers c userId $ \srvs -> - case L.nonEmpty $ deleteFirstsBy sameSrvAddr' (L.toList srvs) (map noAuthSrv usedSrvs) of - Just srvs' -> pickServer srvs' - _ -> pickServer srvs - -withUserServers :: AgentMonad m => AgentClient -> UserId -> (NonEmpty SMPServerWithAuth -> m a) -> m a -withUserServers c userId action = - atomically (TM.lookup userId $ smpServers c) >>= \case - Just srvs -> action srvs - _ -> throwError $ INTERNAL "unknown userId - no SMP servers" - subscriber :: AgentMonad' m => AgentClient -> m () subscriber c@AgentClient {msgQ} = forever $ do t <- atomically $ readTBQueue msgQ @@ -1628,6 +1604,10 @@ cleanupManager c@AgentClient {subQ} = do deleteRcvFilesExpired `catchError` (notify "" . RFERR) deleteRcvFilesDeleted `catchError` (notify "" . RFERR) deleteRcvFilesTmpPaths `catchError` (notify "" . RFERR) + deleteSndFilesExpired `catchError` (notify "" . SFERR) + deleteSndFilesDeleted `catchError` (notify "" . SFERR) + deleteSndFilesPrefixPaths `catchError` (notify "" . SFERR) + deleteExpiredReplicasForDeletion `catchError` (notify "" . SFERR) liftIO $ threadDelay' int where deleteConns = @@ -1635,7 +1615,7 @@ cleanupManager c@AgentClient {subQ} = do void $ withStore' c getDeletedConnIds >>= deleteDeletedConns c withStore' c deleteUsersWithoutConns >>= mapM_ (notify "" . DEL_USER) deleteRcvFilesExpired = do - rcvFilesTTL <- asks (rcvFilesTTL . config) + rcvFilesTTL <- asks $ rcvFilesTTL . config rcvExpired <- withStore' c (`getRcvFilesExpired` rcvFilesTTL) forM_ rcvExpired $ \(dbId, entId, p) -> flip catchError (notify entId . RFERR) $ do removePath =<< toFSFilePath p @@ -1650,6 +1630,25 @@ cleanupManager c@AgentClient {subQ} = do forM_ rcvTmpPaths $ \(dbId, entId, p) -> flip catchError (notify entId . RFERR) $ do removePath =<< toFSFilePath p withStore' c (`updateRcvFileNoTmpPath` dbId) + deleteSndFilesExpired = do + sndFilesTTL <- asks $ sndFilesTTL . config + sndExpired <- withStore' c (`getSndFilesExpired` sndFilesTTL) + forM_ sndExpired $ \(dbId, entId, p) -> flip catchError (notify entId . SFERR) $ do + forM_ p $ removePath <=< toFSFilePath + withStore' c (`deleteSndFile'` dbId) + deleteSndFilesDeleted = do + sndDeleted <- withStore' c getCleanupSndFilesDeleted + forM_ sndDeleted $ \(dbId, entId, p) -> flip catchError (notify entId . SFERR) $ do + forM_ p $ removePath <=< toFSFilePath + withStore' c (`deleteSndFile'` dbId) + deleteSndFilesPrefixPaths = do + sndPrefixPaths <- withStore' c getCleanupSndFilesPrefixPaths + forM_ sndPrefixPaths $ \(dbId, entId, p) -> flip catchError (notify entId . SFERR) $ do + removePath =<< toFSFilePath p + withStore' c (`updateSndFileNoPrefixPath` dbId) + deleteExpiredReplicasForDeletion = do + rcvFilesTTL <- asks $ rcvFilesTTL . config + withStore' c (`deleteDeletedSndChunkReplicasExpired` rcvFilesTTL) notify :: forall e. AEntityI e => EntityId -> ACommand 'Agent e -> ExceptT AgentErrorType m () notify entId cmd = atomically $ writeTBQueue subQ ("", entId, APC (sAEntity @e) cmd) diff --git a/src/Simplex/Messaging/Agent/Client.hs b/src/Simplex/Messaging/Agent/Client.hs index ef9ab0710..20844d938 100644 --- a/src/Simplex/Messaging/Agent/Client.hs +++ b/src/Simplex/Messaging/Agent/Client.hs @@ -53,6 +53,10 @@ module Simplex.Messaging.Agent.Client agentNtfCheckSubscription, agentNtfDeleteSubscription, agentXFTPDownloadChunk, + agentXFTPNewChunk, + agentXFTPUploadChunk, + agentXFTPAddRecipients, + agentXFTPDeleteChunk, agentCbEncrypt, agentCbDecrypt, cryptoError, @@ -77,6 +81,8 @@ module Simplex.Messaging.Agent.Client throwWhenNoDelivery, beginAgentOperation, endAgentOperation, + waitUntilForeground, + checkAgentForeground, suspendSendingAndDatabase, suspendOperation, notifySuspended, @@ -84,6 +90,11 @@ module Simplex.Messaging.Agent.Client withStore, withStore', storeError, + userServers, + pickServer, + getNextServer, + withUserServers, + withNextSrv, ) where @@ -105,7 +116,8 @@ import Data.ByteString.Char8 (ByteString) import qualified Data.ByteString.Char8 as B import Data.Either (lefts, partitionEithers) import Data.Functor (($>)) -import Data.List (foldl', partition) +import Data.Int (Int64) +import Data.List (deleteFirstsBy, foldl', partition, (\\)) import Data.List.NonEmpty (NonEmpty (..), (<|)) import qualified Data.List.NonEmpty as L import Data.Map.Strict (Map) @@ -119,12 +131,12 @@ import Data.Word (Word16) import qualified Database.SQLite.Simple as DB import GHC.Generics (Generic) import Network.Socket (HostName) -import Simplex.FileTransfer.Client (XFTPClient, XFTPClientConfig (..), XFTPClientError) +import Simplex.FileTransfer.Client (XFTPChunkSpec (..), XFTPClient, XFTPClientConfig (..), XFTPClientError) import qualified Simplex.FileTransfer.Client as X -import Simplex.FileTransfer.Description (ChunkReplicaId (..), kb) +import Simplex.FileTransfer.Description (ChunkReplicaId (..), FileDigest (..), kb) import Simplex.FileTransfer.Protocol (FileInfo (..), FileResponse, XFTPErrorType (DIGEST)) import Simplex.FileTransfer.Transport (XFTPRcvChunkSpec (..)) -import Simplex.FileTransfer.Types (RcvFileChunkReplica (..)) +import Simplex.FileTransfer.Types (DeletedSndChunkReplica (..), NewSndChunkReplica (..), RcvFileChunkReplica (..), SndFileChunk (..), SndFileChunkReplica (..)) import Simplex.FileTransfer.Util (uniqueCombine) import Simplex.Messaging.Agent.Env.SQLite import Simplex.Messaging.Agent.Lock @@ -156,6 +168,7 @@ import Simplex.Messaging.Protocol NtfPublicVerifyKey, NtfServer, ProtoServer, + ProtoServerWithAuth (..), Protocol (..), ProtocolServer (..), ProtocolTypeI (..), @@ -164,8 +177,12 @@ import Simplex.Messaging.Protocol RcvMessage (..), RcvNtfPublicDhKey, SMPMsgMeta (..), + SProtocolType (..), SndPublicVerifyKey, + UserProtocol, + XFTPServer, XFTPServerWithAuth, + sameSrvAddr', ) import qualified Simplex.Messaging.Protocol as SMP import Simplex.Messaging.TMap (TMap) @@ -173,6 +190,7 @@ import qualified Simplex.Messaging.TMap as TM import Simplex.Messaging.Transport.Client (TransportHost) import Simplex.Messaging.Util import Simplex.Messaging.Version +import System.Random (randomR) import System.Timeout (timeout) import UnliftIO (mapConcurrently) import UnliftIO.Directory (getTemporaryDirectory) @@ -250,7 +268,7 @@ agentOperations = [ntfNetworkOp, rcvNetworkOp, msgDeliveryOp, sndNetworkOp, data data AgentOpState = AgentOpState {opSuspended :: Bool, opsInProgress :: Int} -data AgentState = ASActive | ASSuspending | ASSuspended +data AgentState = ASForeground | ASSuspending | ASSuspended deriving (Eq, Show) data AgentLocks = AgentLocks {connLocks :: Map String String, srvLocks :: Map String String, delLock :: Maybe String} @@ -295,7 +313,7 @@ newAgentClient InitialAgentServers {smp, ntf, xftp, netCfg} agentEnv = do msgDeliveryOp <- newTVar $ AgentOpState False 0 sndNetworkOp <- newTVar $ AgentOpState False 0 databaseOp <- newTVar $ AgentOpState False 0 - agentState <- newTVar ASActive + agentState <- newTVar ASForeground getMsgLocks <- TM.empty connLocks <- TM.empty deleteLock <- createLock @@ -606,9 +624,9 @@ closeClient_ c cVar = do Just (Right client) -> closeProtocolServerClient client `catchAll_` pure () _ -> pure () -closeXFTPServerClient :: AgentMonad' m => AgentClient -> UserId -> RcvFileChunkReplica -> m () -closeXFTPServerClient c userId RcvFileChunkReplica {server, replicaId = ChunkReplicaId fId} = - mkTransportSession c userId server fId >>= liftIO . closeClient c xftpClients +closeXFTPServerClient :: AgentMonad' m => AgentClient -> UserId -> XFTPServer -> ByteString -> m () +closeXFTPServerClient c userId server entityId = + mkTransportSession c userId server entityId >>= liftIO . closeClient c xftpClients cancelActions :: (Foldable f, Monoid (f (Async ()))) => TVar (f (Async ())) -> IO () cancelActions as = atomically (swapTVar as mempty) >>= mapM_ (forkIO . uninterruptibleCancel) @@ -667,9 +685,9 @@ withXFTPClient :: ByteString -> (Client msg -> ExceptT (ProtocolClientError err) IO b) -> m b -withXFTPClient c (userId, srv, fId) cmdStr action = do - tSess <- mkTransportSession c userId srv fId - withLogClient c tSess (strEncode fId) cmdStr action +withXFTPClient c (userId, srv, entityId) cmdStr action = do + tSess <- mkTransportSession c userId srv entityId + withLogClient c tSess entityId cmdStr action liftClient :: (AgentMonad m, Show err, Encoding err) => (err -> AgentErrorType) -> HostName -> ExceptT (ProtocolClientError err) IO a -> m a liftClient protocolError_ = liftError . protocolClientError protocolError_ @@ -1060,9 +1078,44 @@ agentNtfDeleteSubscription :: AgentMonad m => AgentClient -> NtfSubscriptionId - agentNtfDeleteSubscription c subId NtfToken {ntfServer, ntfPrivKey} = withNtfClient c ntfServer subId "SDEL" $ \ntf -> ntfDeleteSubscription ntf ntfPrivKey subId -agentXFTPDownloadChunk :: AgentMonad m => AgentClient -> UserId -> RcvFileChunkReplica -> XFTPRcvChunkSpec -> m () -agentXFTPDownloadChunk c userId RcvFileChunkReplica {server, replicaId = ChunkReplicaId fId, replicaKey} chunkSpec = - withXFTPClient c (userId, server, fId) "FGET" $ \xftp -> X.downloadXFTPChunk xftp replicaKey fId chunkSpec +agentXFTPDownloadChunk :: AgentMonad m => AgentClient -> UserId -> Int64 -> RcvFileChunkReplica -> XFTPRcvChunkSpec -> m () +agentXFTPDownloadChunk c userId rcvChunkId RcvFileChunkReplica {server, replicaId = ChunkReplicaId fId, replicaKey} chunkSpec = + withXFTPClient c (userId, server, bshow rcvChunkId) "FGET" $ \xftp -> X.downloadXFTPChunk xftp replicaKey fId chunkSpec + +agentXFTPNewChunk :: AgentMonad m => AgentClient -> SndFileChunk -> Int -> XFTPServerWithAuth -> m NewSndChunkReplica +agentXFTPNewChunk c SndFileChunk {userId, sndChunkId, chunkSpec = XFTPChunkSpec {chunkSize}, digest = FileDigest digest} n (ProtoServerWithAuth srv auth) = do + rKeys <- xftpRcvKeys n + (sndKey, replicaKey) <- liftIO $ C.generateSignatureKeyPair C.SEd25519 + let fileInfo = FileInfo {sndKey, size = fromIntegral chunkSize, digest} + logServer "-->" c srv "" "FNEW" + tSess <- mkTransportSession c userId srv $ bshow sndChunkId + (sndId, rIds) <- withClient c tSess "FNEW" $ \xftp -> X.createXFTPChunk xftp replicaKey fileInfo (L.map fst rKeys) auth + logServer "<--" c srv "" $ B.unwords ["SIDS", logSecret sndId] + pure NewSndChunkReplica {server = srv, replicaId = ChunkReplicaId sndId, replicaKey, rcvIdsKeys = L.toList $ xftpRcvIdsKeys rIds rKeys} + +agentXFTPUploadChunk :: AgentMonad m => AgentClient -> UserId -> Int64 -> SndFileChunkReplica -> XFTPChunkSpec -> m () +agentXFTPUploadChunk c userId sndChunkId SndFileChunkReplica {server, replicaId = ChunkReplicaId fId, replicaKey} chunkSpec = + withXFTPClient c (userId, server, bshow sndChunkId) "FPUT" $ \xftp -> X.uploadXFTPChunk xftp replicaKey fId chunkSpec + +agentXFTPAddRecipients :: AgentMonad m => AgentClient -> UserId -> Int64 -> SndFileChunkReplica -> Int -> m (NonEmpty (ChunkReplicaId, C.APrivateSignKey)) +agentXFTPAddRecipients c userId sndChunkId SndFileChunkReplica {server, replicaId = ChunkReplicaId fId, replicaKey} n = do + rKeys <- xftpRcvKeys n + rIds <- withXFTPClient c (userId, server, bshow sndChunkId) "FADD" $ \xftp -> X.addXFTPRecipients xftp replicaKey fId (L.map fst rKeys) + pure $ xftpRcvIdsKeys rIds rKeys + +agentXFTPDeleteChunk :: AgentMonad m => AgentClient -> UserId -> DeletedSndChunkReplica -> m () +agentXFTPDeleteChunk c userId DeletedSndChunkReplica {server, replicaId = ChunkReplicaId fId, replicaKey} = + withXFTPClient c (userId, server, fId) "FDEL" $ \xftp -> X.deleteXFTPChunk xftp replicaKey fId + +xftpRcvKeys :: AgentMonad m => Int -> m (NonEmpty C.ASignatureKeyPair) +xftpRcvKeys n = do + rKeys <- liftIO $ replicateM n $ C.generateSignatureKeyPair C.SEd25519 + case L.nonEmpty rKeys of + Just rKeys' -> pure rKeys' + _ -> throwError $ INTERNAL "non-positive number of recipients" + +xftpRcvIdsKeys :: NonEmpty ByteString -> NonEmpty C.ASignatureKeyPair -> NonEmpty (ChunkReplicaId, C.APrivateSignKey) +xftpRcvIdsKeys rIds rKeys = L.map ChunkReplicaId rIds `L.zip` L.map snd rKeys agentCbEncrypt :: AgentMonad m => SndQueue -> Maybe C.PublicKeyX25519 -> ByteString -> m ByteString agentCbEncrypt SndQueue {e2eDhSecret, smpClientVersion} e2ePubKey msg = do @@ -1161,6 +1214,14 @@ agentOperationBracket c op check action = (\_ -> atomically $ endAgentOperation c op) (const action) +waitUntilForeground :: AgentClient -> STM () +waitUntilForeground c = unlessM ((ASForeground ==) <$> readTVar (agentState c)) retry + +checkAgentForeground :: AgentClient -> STM () +checkAgentForeground c = do + throwWhenInactive c + waitUntilForeground c + withStore' :: AgentMonad m => AgentClient -> (DB.Connection -> IO a) -> m a withStore' c action = withStore c $ fmap Right . action @@ -1208,3 +1269,38 @@ incClientStatN c userId pc n cmd res = do atomically $ incStat c n statsKey where statsKey = AgentStatsKey {userId, host = strEncode $ clientTransportHost pc, clientTs = strEncode $ clientSessionTs pc, cmd, res} + +userServers :: forall p. (ProtocolTypeI p, UserProtocol p) => AgentClient -> TMap UserId (NonEmpty (ProtoServerWithAuth p)) +userServers c = case protocolTypeI @p of + SPSMP -> smpServers c + SPXFTP -> xftpServers c + +pickServer :: forall p m. (AgentMonad' m) => NonEmpty (ProtoServerWithAuth p) -> m (ProtoServerWithAuth p) +pickServer = \case + srv :| [] -> pure srv + servers -> do + gen <- asks randomServer + atomically $ (servers L.!!) <$> stateTVar gen (randomR (0, L.length servers - 1)) + +getNextServer :: forall p m. (ProtocolTypeI p, UserProtocol p, AgentMonad m) => AgentClient -> UserId -> [ProtocolServer p] -> m (ProtoServerWithAuth p) +getNextServer c userId usedSrvs = withUserServers c userId $ \srvs -> + case L.nonEmpty $ deleteFirstsBy sameSrvAddr' (L.toList srvs) (map noAuthSrv usedSrvs) of + Just srvs' -> pickServer srvs' + _ -> pickServer srvs + +withUserServers :: forall p m a. (ProtocolTypeI p, UserProtocol p, AgentMonad m) => AgentClient -> UserId -> (NonEmpty (ProtoServerWithAuth p) -> m a) -> m a +withUserServers c userId action = + atomically (TM.lookup userId $ userServers c) >>= \case + Just srvs -> action srvs + _ -> throwError $ INTERNAL "unknown userId - no user servers" + +withNextSrv :: forall p m a. (ProtocolTypeI p, UserProtocol p, AgentMonad m) => AgentClient -> UserId -> TVar [ProtocolServer p] -> [ProtocolServer p] -> ((ProtoServerWithAuth p) -> m a) -> m a +withNextSrv c userId usedSrvs initUsed action = do + used <- readTVarIO usedSrvs + srvAuth@(ProtoServerWithAuth srv _) <- getNextServer c userId used + atomically $ do + srvs_ <- TM.lookup userId $ userServers c + let unused = maybe [] ((\\ used) . map protoServer . L.toList) srvs_ + used' = if null unused then initUsed else srv : used + writeTVar usedSrvs $! used' + action srvAuth diff --git a/src/Simplex/Messaging/Agent/Env/SQLite.hs b/src/Simplex/Messaging/Agent/Env/SQLite.hs index 7cb7509f0..40772ee22 100644 --- a/src/Simplex/Messaging/Agent/Env/SQLite.hs +++ b/src/Simplex/Messaging/Agent/Env/SQLite.hs @@ -83,7 +83,9 @@ data AgentConfig = AgentConfig initialCleanupDelay :: Int64, cleanupInterval :: Int64, rcvFilesTTL :: NominalDiffTime, + sndFilesTTL :: NominalDiffTime, xftpNotifyErrsOnRetry :: Bool, + xftpMaxRecipientsPerRequest :: Int, deleteErrorCount :: Int, ntfCron :: Word16, ntfWorkerDelay :: Int, @@ -144,7 +146,9 @@ defaultAgentConfig = initialCleanupDelay = 30 * 1000000, -- 30 seconds cleanupInterval = 30 * 60 * 1000000, -- 30 minutes rcvFilesTTL = 2 * nominalDay, + sndFilesTTL = nominalDay, xftpNotifyErrsOnRetry = True, + xftpMaxRecipientsPerRequest = 200, deleteErrorCount = 10, ntfCron = 20, -- minutes ntfWorkerDelay = 100000, -- microseconds @@ -205,17 +209,15 @@ newNtfSubSupervisor qSize = do data XFTPAgent = XFTPAgent { -- if set, XFTP file paths will be considered as relative to this directory xftpWorkDir :: TVar (Maybe FilePath), - xftpWorkers :: TMap (Maybe XFTPServer) (TMVar (), Async ()) - -- separate send workers for unhindered concurrency between download and upload, - -- clients can also be separate by passing direction to withXFTPClient, and differentiating by it - -- xftpSndWorkers :: TMap (Maybe XFTPServer) (TMVar (), Async ()), - -- files currently in upload - to throttle upload of other files' chunks, - -- this optimization can be dropped for the MVP - -- xftpSndFiles :: TVar (Set DBSndFileId) + xftpRcvWorkers :: TMap (Maybe XFTPServer) (TMVar (), Async ()), + xftpSndWorkers :: TMap (Maybe XFTPServer) (TMVar (), Async ()), + xftpDelWorkers :: TMap XFTPServer (TMVar (), Async ()) } newXFTPAgent :: STM XFTPAgent newXFTPAgent = do xftpWorkDir <- newTVar Nothing - xftpWorkers <- TM.empty - pure XFTPAgent {xftpWorkDir, xftpWorkers} + xftpRcvWorkers <- TM.empty + xftpSndWorkers <- TM.empty + xftpDelWorkers <- TM.empty + pure XFTPAgent {xftpWorkDir, xftpRcvWorkers, xftpSndWorkers, xftpDelWorkers} diff --git a/src/Simplex/Messaging/Agent/RetryInterval.hs b/src/Simplex/Messaging/Agent/RetryInterval.hs index 7f88592fc..97d537a5a 100644 --- a/src/Simplex/Messaging/Agent/RetryInterval.hs +++ b/src/Simplex/Messaging/Agent/RetryInterval.hs @@ -48,10 +48,10 @@ updateRetryInterval2 RI2State {slowInterval, fastInterval} RetryInterval2 {riSlo data RetryIntervalMode = RISlow | RIFast deriving (Eq, Show) -withRetryInterval :: forall m. MonadIO m => RetryInterval -> (Int64 -> m () -> m ()) -> m () +withRetryInterval :: forall m a. MonadIO m => RetryInterval -> (Int64 -> m a -> m a) -> m a withRetryInterval ri action = callAction 0 $ initialInterval ri where - callAction :: Int64 -> Int64 -> m () + callAction :: Int64 -> Int64 -> m a callAction elapsed delay = action delay loop where loop = do diff --git a/src/Simplex/Messaging/Agent/Store.hs b/src/Simplex/Messaging/Agent/Store.hs index 248e52fc9..2db7264ba 100644 --- a/src/Simplex/Messaging/Agent/Store.hs +++ b/src/Simplex/Messaging/Agent/Store.hs @@ -539,4 +539,6 @@ data StoreError SEXFTPServerNotFound | -- | XFTP File not found. SEFileNotFound + | -- | XFTP Deleted snd chunk replica not found. + SEDeletedSndChunkReplicaNotFound deriving (Eq, Show, Exception) diff --git a/src/Simplex/Messaging/Agent/Store/SQLite.hs b/src/Simplex/Messaging/Agent/Store/SQLite.hs index 99e87fc3f..9d8246991 100644 --- a/src/Simplex/Messaging/Agent/Store/SQLite.hs +++ b/src/Simplex/Messaging/Agent/Store/SQLite.hs @@ -129,7 +129,10 @@ module Simplex.Messaging.Agent.Store.SQLite getActiveNtfToken, getNtfRcvQueue, setConnectionNtfs, - -- File transfer + + -- * File transfer + + -- Rcv files createRcvFile, getRcvFile, getRcvFileByEntityId, @@ -147,6 +150,34 @@ module Simplex.Messaging.Agent.Store.SQLite getCleanupRcvFilesTmpPaths, getCleanupRcvFilesDeleted, getRcvFilesExpired, + -- Snd files + createSndFile, + getSndFile, + getSndFileByEntityId, + getNextSndFileToPrepare, + updateSndFileError, + updateSndFileStatus, + updateSndFileEncrypted, + updateSndFileComplete, + updateSndFileNoPrefixPath, + updateSndFileDeleted, + deleteSndFile', + getSndFileDeleted, + createSndFileReplica, + getNextSndChunkToUpload, + updateSndChunkReplicaDelay, + addSndChunkReplicaRecipients, + updateSndChunkReplicaStatus, + getPendingSndFilesServers, + getCleanupSndFilesPrefixPaths, + getCleanupSndFilesDeleted, + getSndFilesExpired, + createDeletedSndChunkReplica, + getNextDeletedSndChunkReplica, + updateDeletedSndChunkReplicaDelay, + deleteDeletedSndChunkReplica, + getPendingDelFilesServers, + deleteDeletedSndChunkReplicasExpired, -- * utilities withConnection, @@ -191,6 +222,7 @@ import Database.SQLite.Simple.ToField (ToField (..)) import qualified Database.SQLite3 as SQLite3 import GHC.Generics (Generic) import Network.Socket (ServiceName) +import Simplex.FileTransfer.Client (XFTPChunkSpec (..)) import Simplex.FileTransfer.Description import Simplex.FileTransfer.Protocol (FileParty (..)) import Simplex.FileTransfer.Types @@ -1868,9 +1900,9 @@ getRcvFileIdByEntityId_ db userId rcvFileEntityId = getRcvFile :: DB.Connection -> DBRcvFileId -> IO (Either StoreError RcvFile) getRcvFile db rcvFileId = runExceptT $ do - fd@RcvFile {rcvFileEntityId, userId, tmpPath} <- ExceptT getFile + f@RcvFile {rcvFileEntityId, userId, tmpPath} <- ExceptT getFile chunks <- maybe (pure []) (liftIO . getChunks rcvFileEntityId userId) tmpPath - pure (fd {chunks} :: RcvFile) + pure (f {chunks} :: RcvFile) where getFile :: IO (Either StoreError RcvFile) getFile = do @@ -1931,12 +1963,11 @@ updateRcvChunkReplicaDelay db replicaId delay = do updatedAt <- getCurrentTime DB.execute db "UPDATE rcv_file_chunk_replicas SET delay = ?, retries = retries + 1, updated_at = ? WHERE rcv_file_chunk_replica_id = ?" (delay, updatedAt, replicaId) -updateRcvFileChunkReceived :: DB.Connection -> Int64 -> Int64 -> DBRcvFileId -> FilePath -> IO (Either StoreError RcvFile) -updateRcvFileChunkReceived db rId cId fId chunkTmpPath = do +updateRcvFileChunkReceived :: DB.Connection -> Int64 -> Int64 -> FilePath -> IO () +updateRcvFileChunkReceived db replicaId chunkId chunkTmpPath = do updatedAt <- getCurrentTime - DB.execute db "UPDATE rcv_file_chunk_replicas SET received = 1, updated_at = ? WHERE rcv_file_chunk_replica_id = ?" (updatedAt, rId) - DB.execute db "UPDATE rcv_file_chunks SET tmp_path = ?, updated_at = ? WHERE rcv_file_chunk_id = ?" (chunkTmpPath, updatedAt, cId) - getRcvFile db fId + DB.execute db "UPDATE rcv_file_chunk_replicas SET received = 1, updated_at = ? WHERE rcv_file_chunk_replica_id = ?" (updatedAt, replicaId) + DB.execute db "UPDATE rcv_file_chunks SET tmp_path = ?, updated_at = ? WHERE rcv_file_chunk_id = ?" (chunkTmpPath, updatedAt, chunkId) updateRcvFileStatus :: DB.Connection -> DBRcvFileId -> RcvFileStatus -> IO () updateRcvFileStatus db rcvFileId status = do @@ -2025,7 +2056,7 @@ getNextRcvFileToDecrypt db ttl = do getPendingRcvFilesServers :: DB.Connection -> NominalDiffTime -> IO [XFTPServer] getPendingRcvFilesServers db ttl = do cutoffTs <- addUTCTime (- ttl) <$> getCurrentTime - map toServer + map toXFTPServer <$> DB.query db [sql| @@ -2039,9 +2070,9 @@ getPendingRcvFilesServers db ttl = do AND f.status = ? AND f.deleted = 0 AND f.created_at >= ? |] (RFSReceiving, cutoffTs) - where - toServer :: (NonEmpty TransportHost, ServiceName, C.KeyHash) -> XFTPServer - toServer (host, port, keyHash) = XFTPServer host port keyHash + +toXFTPServer :: (NonEmpty TransportHost, ServiceName, C.KeyHash) -> XFTPServer +toXFTPServer (host, port, keyHash) = XFTPServer host port keyHash getCleanupRcvFilesTmpPaths :: DB.Connection -> IO [(DBRcvFileId, RcvFileId, FilePath)] getCleanupRcvFilesTmpPaths db = @@ -2075,3 +2106,375 @@ getRcvFilesExpired db ttl = do WHERE created_at < ? |] (Only cutoffTs) + +createSndFile :: DB.Connection -> TVar ChaChaDRG -> UserId -> Int -> FilePath -> FilePath -> C.SbKey -> C.CbNonce -> IO (Either StoreError SndFileId) +createSndFile db gVar userId numRecipients path prefixPath key nonce = + createWithRandomId gVar $ \sndFileEntityId -> + DB.execute + db + "INSERT INTO snd_files (snd_file_entity_id, user_id, num_recipients, key, nonce, path, prefix_path, status) VALUES (?,?,?,?,?,?,?,?)" + (sndFileEntityId, userId, numRecipients, key, nonce, path, prefixPath, SFSNew) + +getSndFileByEntityId :: DB.Connection -> UserId -> SndFileId -> IO (Either StoreError SndFile) +getSndFileByEntityId db userId sndFileEntityId = runExceptT $ do + sndFileId <- ExceptT $ getSndFileIdByEntityId_ db userId sndFileEntityId + ExceptT $ getSndFile db sndFileId + +getSndFileIdByEntityId_ :: DB.Connection -> UserId -> SndFileId -> IO (Either StoreError DBSndFileId) +getSndFileIdByEntityId_ db userId sndFileEntityId = + firstRow fromOnly SEFileNotFound $ + DB.query db "SELECT snd_file_id FROM snd_files WHERE user_id = ? AND snd_file_entity_id = ?" (userId, sndFileEntityId) + +getSndFile :: DB.Connection -> DBSndFileId -> IO (Either StoreError SndFile) +getSndFile db sndFileId = runExceptT $ do + f@SndFile {sndFileEntityId, userId, numRecipients, prefixPath} <- ExceptT getFile + chunks <- maybe (pure []) (liftIO . getChunks sndFileEntityId userId numRecipients) prefixPath + pure (f {chunks} :: SndFile) + where + getFile :: IO (Either StoreError SndFile) + getFile = do + firstRow toFile SEFileNotFound $ + DB.query + db + [sql| + SELECT snd_file_entity_id, user_id, num_recipients, digest, key, nonce, path, prefix_path, status, deleted + FROM snd_files + WHERE snd_file_id = ? + |] + (Only sndFileId) + where + toFile :: (SndFileId, UserId, Int, Maybe FileDigest, C.SbKey, C.CbNonce, FilePath, Maybe FilePath, SndFileStatus, Bool) -> SndFile + toFile (sndFileEntityId, userId, numRecipients, digest, key, nonce, filePath, prefixPath, status, deleted) = + SndFile {sndFileId, sndFileEntityId, userId, numRecipients, digest, key, nonce, filePath, prefixPath, status, deleted, chunks = []} + getChunks :: SndFileId -> UserId -> Int -> FilePath -> IO [SndFileChunk] + getChunks sndFileEntityId userId numRecipients filePrefixPath = do + chunks <- + map toChunk + <$> DB.query + db + [sql| + SELECT snd_file_chunk_id, chunk_no, chunk_offset, chunk_size, digest + FROM snd_file_chunks + WHERE snd_file_id = ? + |] + (Only sndFileId) + forM chunks $ \chunk@SndFileChunk {sndChunkId} -> do + replicas' <- getChunkReplicas sndChunkId + pure (chunk {replicas = replicas'} :: SndFileChunk) + where + toChunk :: (Int64, Int, Int64, Word32, FileDigest) -> SndFileChunk + toChunk (sndChunkId, chunkNo, chunkOffset, chunkSize, digest) = + let chunkSpec = XFTPChunkSpec {filePath = sndFileEncPath filePrefixPath, chunkOffset, chunkSize} + in SndFileChunk {sndFileId, sndFileEntityId, userId, numRecipients, sndChunkId, chunkNo, chunkSpec, filePrefixPath, digest, replicas = []} + getChunkReplicas :: Int64 -> IO [SndFileChunkReplica] + getChunkReplicas chunkId = do + replicas <- + map toReplica + <$> DB.query + db + [sql| + SELECT + r.snd_file_chunk_replica_id, r.replica_id, r.replica_key, r.replica_status, r.delay, r.retries, + s.xftp_host, s.xftp_port, s.xftp_key_hash + FROM snd_file_chunk_replicas r + JOIN xftp_servers s ON s.xftp_server_id = r.xftp_server_id + WHERE r.snd_file_chunk_id = ? + |] + (Only chunkId) + forM replicas $ \replica@SndFileChunkReplica {sndChunkReplicaId} -> do + rcvIdsKeys <- getChunkReplicaRecipients_ db sndChunkReplicaId + pure (replica :: SndFileChunkReplica) {rcvIdsKeys} + where + toReplica :: (Int64, ChunkReplicaId, C.APrivateSignKey, SndFileReplicaStatus, Maybe Int64, Int, NonEmpty TransportHost, ServiceName, C.KeyHash) -> SndFileChunkReplica + toReplica (sndChunkReplicaId, replicaId, replicaKey, replicaStatus, delay, retries, host, port, keyHash) = + let server = XFTPServer host port keyHash + in SndFileChunkReplica {sndChunkReplicaId, server, replicaId, replicaKey, replicaStatus, delay, retries, rcvIdsKeys = []} + +getChunkReplicaRecipients_ :: DB.Connection -> Int64 -> IO [(ChunkReplicaId, C.APrivateSignKey)] +getChunkReplicaRecipients_ db replicaId = + DB.query + db + [sql| + SELECT rcv_replica_id, rcv_replica_key + FROM snd_file_chunk_replica_recipients + WHERE snd_file_chunk_replica_id = ? + |] + (Only replicaId) + +getNextSndFileToPrepare :: DB.Connection -> NominalDiffTime -> IO (Maybe SndFile) +getNextSndFileToPrepare db ttl = do + cutoffTs <- addUTCTime (- ttl) <$> getCurrentTime + fileId_ :: Maybe DBSndFileId <- + maybeFirstRow fromOnly $ + DB.query + db + [sql| + SELECT snd_file_id + FROM snd_files + WHERE status IN (?,?,?) AND deleted = 0 AND created_at >= ? + ORDER BY created_at ASC LIMIT 1 + |] + (SFSNew, SFSEncrypting, SFSEncrypted, cutoffTs) + case fileId_ of + Nothing -> pure Nothing + Just fileId -> eitherToMaybe <$> getSndFile db fileId + +updateSndFileError :: DB.Connection -> DBSndFileId -> String -> IO () +updateSndFileError db sndFileId errStr = do + updatedAt <- getCurrentTime + DB.execute db "UPDATE snd_files SET prefix_path = NULL, error = ?, status = ?, updated_at = ? WHERE snd_file_id = ?" (errStr, SFSError, updatedAt, sndFileId) + +updateSndFileStatus :: DB.Connection -> DBSndFileId -> SndFileStatus -> IO () +updateSndFileStatus db sndFileId status = do + updatedAt <- getCurrentTime + DB.execute db "UPDATE snd_files SET status = ?, updated_at = ? WHERE snd_file_id = ?" (status, updatedAt, sndFileId) + +updateSndFileEncrypted :: DB.Connection -> DBSndFileId -> FileDigest -> [(XFTPChunkSpec, FileDigest)] -> IO () +updateSndFileEncrypted db sndFileId digest chunkSpecsDigests = do + updatedAt <- getCurrentTime + DB.execute db "UPDATE snd_files SET status = ?, digest = ?, updated_at = ? WHERE snd_file_id = ?" (SFSEncrypted, digest, updatedAt, sndFileId) + forM_ (zip [1 ..] chunkSpecsDigests) $ \(chunkNo :: Int, (XFTPChunkSpec {chunkOffset, chunkSize}, chunkDigest)) -> + DB.execute db "INSERT INTO snd_file_chunks (snd_file_id, chunk_no, chunk_offset, chunk_size, digest) VALUES (?,?,?,?,?)" (sndFileId, chunkNo, chunkOffset, chunkSize, chunkDigest) + +updateSndFileComplete :: DB.Connection -> DBSndFileId -> IO () +updateSndFileComplete db sndFileId = do + updatedAt <- getCurrentTime + DB.execute db "UPDATE snd_files SET prefix_path = NULL, status = ?, updated_at = ? WHERE snd_file_id = ?" (SFSComplete, updatedAt, sndFileId) + +updateSndFileNoPrefixPath :: DB.Connection -> DBSndFileId -> IO () +updateSndFileNoPrefixPath db sndFileId = do + updatedAt <- getCurrentTime + DB.execute db "UPDATE snd_files SET prefix_path = NULL, updated_at = ? WHERE snd_file_id = ?" (updatedAt, sndFileId) + +updateSndFileDeleted :: DB.Connection -> DBSndFileId -> IO () +updateSndFileDeleted db sndFileId = do + updatedAt <- getCurrentTime + DB.execute db "UPDATE snd_files SET deleted = 1, updated_at = ? WHERE snd_file_id = ?" (updatedAt, sndFileId) + +deleteSndFile' :: DB.Connection -> DBSndFileId -> IO () +deleteSndFile' db sndFileId = + DB.execute db "DELETE FROM snd_files WHERE snd_file_id = ?" (Only sndFileId) + +getSndFileDeleted :: DB.Connection -> DBSndFileId -> IO Bool +getSndFileDeleted db sndFileId = + fromMaybe True + <$> maybeFirstRow fromOnly (DB.query db "SELECT deleted FROM snd_files WHERE snd_file_id = ?" (Only sndFileId)) + +createSndFileReplica :: DB.Connection -> SndFileChunk -> NewSndChunkReplica -> IO () +createSndFileReplica db SndFileChunk {sndChunkId} NewSndChunkReplica {server, replicaId, replicaKey, rcvIdsKeys} = do + srvId <- createXFTPServer_ db server + DB.execute + db + [sql| + INSERT INTO snd_file_chunk_replicas + (snd_file_chunk_id, replica_number, xftp_server_id, replica_id, replica_key, replica_status) + VALUES (?,?,?,?,?,?) + |] + (sndChunkId, 1 :: Int, srvId, replicaId, replicaKey, SFRSCreated) + rId <- insertedRowId db + forM_ rcvIdsKeys $ \(rcvId, rcvKey) -> do + DB.execute + db + [sql| + INSERT INTO snd_file_chunk_replica_recipients + (snd_file_chunk_replica_id, rcv_replica_id, rcv_replica_key) + VALUES (?,?,?) + |] + (rId, rcvId, rcvKey) + +getNextSndChunkToUpload :: DB.Connection -> XFTPServer -> NominalDiffTime -> IO (Maybe SndFileChunk) +getNextSndChunkToUpload db server@ProtocolServer {host, port, keyHash} ttl = do + cutoffTs <- addUTCTime (- ttl) <$> getCurrentTime + chunk_ <- + maybeFirstRow toChunk $ + DB.query + db + [sql| + SELECT + f.snd_file_id, f.snd_file_entity_id, f.user_id, f.num_recipients, f.prefix_path, + c.snd_file_chunk_id, c.chunk_no, c.chunk_offset, c.chunk_size, c.digest, + r.snd_file_chunk_replica_id, r.replica_id, r.replica_key, r.replica_status, r.delay, r.retries + FROM snd_file_chunk_replicas r + JOIN xftp_servers s ON s.xftp_server_id = r.xftp_server_id + JOIN snd_file_chunks c ON c.snd_file_chunk_id = r.snd_file_chunk_id + JOIN snd_files f ON f.snd_file_id = c.snd_file_id + WHERE s.xftp_host = ? AND s.xftp_port = ? AND s.xftp_key_hash = ? + AND r.replica_status = ? AND r.replica_number = 1 + AND (f.status = ? OR f.status = ?) AND f.deleted = 0 AND f.created_at >= ? + ORDER BY r.created_at ASC + LIMIT 1 + |] + (host, port, keyHash, SFRSCreated, SFSEncrypted, SFSUploading, cutoffTs) + forM chunk_ $ \chunk@SndFileChunk {replicas} -> do + replicas' <- forM replicas $ \replica@SndFileChunkReplica {sndChunkReplicaId} -> do + rcvIdsKeys <- getChunkReplicaRecipients_ db sndChunkReplicaId + pure (replica :: SndFileChunkReplica) {rcvIdsKeys} + pure (chunk {replicas = replicas'} :: SndFileChunk) + where + toChunk :: ((DBSndFileId, SndFileId, UserId, Int, FilePath) :. (Int64, Int, Int64, Word32, FileDigest) :. (Int64, ChunkReplicaId, C.APrivateSignKey, SndFileReplicaStatus, Maybe Int64, Int)) -> SndFileChunk + toChunk ((sndFileId, sndFileEntityId, userId, numRecipients, filePrefixPath) :. (sndChunkId, chunkNo, chunkOffset, chunkSize, digest) :. (sndChunkReplicaId, replicaId, replicaKey, replicaStatus, delay, retries)) = + let chunkSpec = XFTPChunkSpec {filePath = sndFileEncPath filePrefixPath, chunkOffset, chunkSize} + in SndFileChunk + { sndFileId, + sndFileEntityId, + userId, + numRecipients, + sndChunkId, + chunkNo, + chunkSpec, + digest, + filePrefixPath, + replicas = [SndFileChunkReplica {sndChunkReplicaId, server, replicaId, replicaKey, replicaStatus, delay, retries, rcvIdsKeys = []}] + } + +updateSndChunkReplicaDelay :: DB.Connection -> Int64 -> Int64 -> IO () +updateSndChunkReplicaDelay db replicaId delay = do + updatedAt <- getCurrentTime + DB.execute db "UPDATE snd_file_chunk_replicas SET delay = ?, retries = retries + 1, updated_at = ? WHERE snd_file_chunk_replica_id = ?" (delay, updatedAt, replicaId) + +addSndChunkReplicaRecipients :: DB.Connection -> SndFileChunkReplica -> [(ChunkReplicaId, C.APrivateSignKey)] -> IO SndFileChunkReplica +addSndChunkReplicaRecipients db r@SndFileChunkReplica {sndChunkReplicaId} rcvIdsKeys = do + forM_ rcvIdsKeys $ \(rcvId, rcvKey) -> do + DB.execute + db + [sql| + INSERT INTO snd_file_chunk_replica_recipients + (snd_file_chunk_replica_id, rcv_replica_id, rcv_replica_key) + VALUES (?,?,?) + |] + (sndChunkReplicaId, rcvId, rcvKey) + rcvIdsKeys' <- getChunkReplicaRecipients_ db sndChunkReplicaId + pure (r :: SndFileChunkReplica) {rcvIdsKeys = rcvIdsKeys'} + +updateSndChunkReplicaStatus :: DB.Connection -> Int64 -> SndFileReplicaStatus -> IO () +updateSndChunkReplicaStatus db replicaId status = do + updatedAt <- getCurrentTime + DB.execute db "UPDATE snd_file_chunk_replicas SET replica_status = ?, updated_at = ? WHERE snd_file_chunk_replica_id = ?" (status, updatedAt, replicaId) + +getPendingSndFilesServers :: DB.Connection -> NominalDiffTime -> IO [XFTPServer] +getPendingSndFilesServers db ttl = do + cutoffTs <- addUTCTime (- ttl) <$> getCurrentTime + map toXFTPServer + <$> DB.query + db + [sql| + SELECT DISTINCT + s.xftp_host, s.xftp_port, s.xftp_key_hash + FROM snd_file_chunk_replicas r + JOIN xftp_servers s ON s.xftp_server_id = r.xftp_server_id + JOIN snd_file_chunks c ON c.snd_file_chunk_id = r.snd_file_chunk_id + JOIN snd_files f ON f.snd_file_id = c.snd_file_id + WHERE r.replica_status = ? AND r.replica_number = 1 + AND (f.status = ? OR f.status = ?) AND f.deleted = 0 AND f.created_at >= ? + |] + (SFRSCreated, SFSEncrypted, SFSUploading, cutoffTs) + +getCleanupSndFilesPrefixPaths :: DB.Connection -> IO [(DBSndFileId, SndFileId, FilePath)] +getCleanupSndFilesPrefixPaths db = + DB.query + db + [sql| + SELECT snd_file_id, snd_file_entity_id, prefix_path + FROM snd_files + WHERE status IN (?,?) AND prefix_path IS NOT NULL + |] + (SFSComplete, SFSError) + +getCleanupSndFilesDeleted :: DB.Connection -> IO [(DBSndFileId, SndFileId, Maybe FilePath)] +getCleanupSndFilesDeleted db = + DB.query_ + db + [sql| + SELECT snd_file_id, snd_file_entity_id, prefix_path + FROM snd_files + WHERE deleted = 1 + |] + +getSndFilesExpired :: DB.Connection -> NominalDiffTime -> IO [(DBSndFileId, SndFileId, Maybe FilePath)] +getSndFilesExpired db ttl = do + cutoffTs <- addUTCTime (- ttl) <$> getCurrentTime + DB.query + db + [sql| + SELECT snd_file_id, snd_file_entity_id, prefix_path + FROM snd_files + WHERE created_at < ? + |] + (Only cutoffTs) + +createDeletedSndChunkReplica :: DB.Connection -> UserId -> FileChunkReplica -> IO () +createDeletedSndChunkReplica db userId FileChunkReplica {server, replicaId, replicaKey} = do + srvId <- createXFTPServer_ db server + DB.execute + db + "INSERT INTO deleted_snd_chunk_replicas (user_id, xftp_server_id, replica_id, replica_key) VALUES (?,?,?,?)" + (userId, srvId, replicaId, replicaKey) + +getDeletedSndChunkReplica :: DB.Connection -> DBSndFileId -> IO (Either StoreError DeletedSndChunkReplica) +getDeletedSndChunkReplica db deletedSndChunkReplicaId = + firstRow toReplica SEDeletedSndChunkReplicaNotFound $ + DB.query + db + [sql| + SELECT + r.user_id, r.replica_id, r.replica_key, r.delay, r.retries, + s.xftp_host, s.xftp_port, s.xftp_key_hash + FROM deleted_snd_chunk_replicas r + JOIN xftp_servers s ON s.xftp_server_id = r.xftp_server_id + WHERE r.deleted_snd_chunk_replica_id = ? + |] + (Only deletedSndChunkReplicaId) + where + toReplica :: (UserId, ChunkReplicaId, C.APrivateSignKey, Maybe Int64, Int, NonEmpty TransportHost, ServiceName, C.KeyHash) -> DeletedSndChunkReplica + toReplica (userId, replicaId, replicaKey, delay, retries, host, port, keyHash) = + let server = XFTPServer host port keyHash + in DeletedSndChunkReplica {deletedSndChunkReplicaId, userId, server, replicaId, replicaKey, delay, retries} + +getNextDeletedSndChunkReplica :: DB.Connection -> XFTPServer -> NominalDiffTime -> IO (Maybe DeletedSndChunkReplica) +getNextDeletedSndChunkReplica db ProtocolServer {host, port, keyHash} ttl = do + cutoffTs <- addUTCTime (- ttl) <$> getCurrentTime + replicaId_ :: Maybe Int64 <- + maybeFirstRow fromOnly $ + DB.query + db + [sql| + SELECT r.deleted_snd_chunk_replica_id + FROM deleted_snd_chunk_replicas r + JOIN xftp_servers s ON s.xftp_server_id = r.xftp_server_id + WHERE s.xftp_host = ? AND s.xftp_port = ? AND s.xftp_key_hash = ? + AND r.created_at >= ? + ORDER BY r.created_at ASC LIMIT 1 + |] + (host, port, keyHash, cutoffTs) + case replicaId_ of + Nothing -> pure Nothing + Just replicaId -> eitherToMaybe <$> getDeletedSndChunkReplica db replicaId + +updateDeletedSndChunkReplicaDelay :: DB.Connection -> Int64 -> Int64 -> IO () +updateDeletedSndChunkReplicaDelay db deletedSndChunkReplicaId delay = do + updatedAt <- getCurrentTime + DB.execute db "UPDATE deleted_snd_chunk_replicas SET delay = ?, retries = retries + 1, updated_at = ? WHERE deleted_snd_chunk_replica_id = ?" (delay, updatedAt, deletedSndChunkReplicaId) + +deleteDeletedSndChunkReplica :: DB.Connection -> Int64 -> IO () +deleteDeletedSndChunkReplica db deletedSndChunkReplicaId = + DB.execute db "DELETE FROM deleted_snd_chunk_replicas WHERE deleted_snd_chunk_replica_id = ?" (Only deletedSndChunkReplicaId) + +getPendingDelFilesServers :: DB.Connection -> NominalDiffTime -> IO [XFTPServer] +getPendingDelFilesServers db ttl = do + cutoffTs <- addUTCTime (- ttl) <$> getCurrentTime + map toXFTPServer + <$> DB.query + db + [sql| + SELECT DISTINCT + s.xftp_host, s.xftp_port, s.xftp_key_hash + FROM deleted_snd_chunk_replicas r + JOIN xftp_servers s ON s.xftp_server_id = r.xftp_server_id + WHERE r.created_at >= ? + |] + (Only cutoffTs) + +deleteDeletedSndChunkReplicasExpired :: DB.Connection -> NominalDiffTime -> IO () +deleteDeletedSndChunkReplicasExpired db ttl = do + cutoffTs <- addUTCTime (- ttl) <$> getCurrentTime + DB.execute db "DELETE FROM deleted_snd_chunk_replicas WHERE created_at < ?" (Only cutoffTs) diff --git a/src/Simplex/Messaging/Agent/Store/SQLite/Migrations.hs b/src/Simplex/Messaging/Agent/Store/SQLite/Migrations.hs index 44dc85a07..182c5effa 100644 --- a/src/Simplex/Messaging/Agent/Store/SQLite/Migrations.hs +++ b/src/Simplex/Messaging/Agent/Store/SQLite/Migrations.hs @@ -57,6 +57,7 @@ import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230120_delete_errors import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230217_server_key_hash import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230223_files import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230320_retry_state +import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230401_snd_files import Simplex.Messaging.Encoding.String import Simplex.Messaging.Parsers (dropPrefix, sumTypeJSON) import Simplex.Messaging.Transport.Client (TransportHost) @@ -80,7 +81,8 @@ schemaMigrations = ("m20230120_delete_errors", m20230120_delete_errors, Nothing), ("m20230217_server_key_hash", m20230217_server_key_hash, Nothing), ("m20230223_files", m20230223_files, Just down_m20230223_files), - ("m20230320_retry_state", m20230320_retry_state, Just down_m20230320_retry_state) + ("m20230320_retry_state", m20230320_retry_state, Just down_m20230320_retry_state), + ("m20230401_snd_files", m20230401_snd_files, Just down_m20230401_snd_files) ] -- | The list of migrations in ascending order by date diff --git a/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/M20230401_snd_files.hs b/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/M20230401_snd_files.hs index df5754fb2..d75e21223 100644 --- a/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/M20230401_snd_files.hs +++ b/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/M20230401_snd_files.hs @@ -5,21 +5,22 @@ module Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230401_snd_files where import Database.SQLite.Simple (Query) import Database.SQLite.Simple.QQ (sql) --- this migration is a draft - it is not included in the list of migrations m20230401_snd_files :: Query m20230401_snd_files = [sql| CREATE TABLE snd_files ( - snd_file_id INTEGER PRIMARY KEY AUTOINCREMENT, + snd_file_id INTEGER PRIMARY KEY, + snd_file_entity_id BLOB NOT NULL, user_id INTEGER NOT NULL REFERENCES users ON DELETE CASCADE, - size INTEGER NOT NULL, - digest BLOB NOT NULL, - key BLOB NOT NULL, - nonce BLOB NOT NULL, - chunk_size INTEGER NOT NULL, + num_recipients INTEGER NOT NULL, + digest BLOB, + key BLOB NOT NUll, + nonce BLOB NOT NUll, path TEXT NOT NULL, - enc_path TEXT, + prefix_path TEXT, status TEXT NOT NULL, + deleted INTEGER NOT NULL DEFAULT 0, + error TEXT, created_at TEXT NOT NULL DEFAULT (datetime('now')), updated_at TEXT NOT NULL DEFAULT (datetime('now')) ); @@ -32,18 +33,13 @@ CREATE TABLE snd_file_chunks ( chunk_no INTEGER NOT NULL, chunk_offset INTEGER NOT NULL, chunk_size INTEGER NOT NULL, - digest BLOB NOT NULL, - delay INTEGER, + digest BLOB, created_at TEXT NOT NULL DEFAULT (datetime('now')), updated_at TEXT NOT NULL DEFAULT (datetime('now')) ); CREATE INDEX idx_snd_file_chunks_snd_file_id ON snd_file_chunks(snd_file_id); --- ? add fk to snd_file_descriptions? --- ? probably it's not necessary since these entities are --- ? required at different stages of sending files - --- ? replicas on upload, description on notifying client CREATE TABLE snd_file_chunk_replicas ( snd_file_chunk_replica_id INTEGER PRIMARY KEY, snd_file_chunk_id INTEGER NOT NULL REFERENCES snd_file_chunks ON DELETE CASCADE, @@ -51,8 +47,8 @@ CREATE TABLE snd_file_chunk_replicas ( xftp_server_id INTEGER NOT NULL REFERENCES xftp_servers ON DELETE CASCADE, replica_id BLOB NOT NULL, replica_key BLOB NOT NULL, - -- created INTEGER NOT NULL DEFAULT 0, -- as in XFTP create - registered on server - uploaded INTEGER NOT NULL DEFAULT 0, + replica_status TEXT NOT NULL, + delay INTEGER, retries INTEGER NOT NULL DEFAULT 0, created_at TEXT NOT NULL DEFAULT (datetime('now')), updated_at TEXT NOT NULL DEFAULT (datetime('now')) @@ -72,14 +68,39 @@ CREATE TABLE snd_file_chunk_replica_recipients ( CREATE INDEX idx_snd_file_chunk_replica_recipients_snd_file_chunk_replica_id ON snd_file_chunk_replica_recipients(snd_file_chunk_replica_id); -CREATE TABLE snd_file_descriptions ( - snd_file_description_id INTEGER PRIMARY KEY, - snd_file_id INTEGER NOT NULL REFERENCES snd_files ON DELETE CASCADE, - sender INTEGER NOT NULL, -- 1 for sender file description - descr_text TEXT NOT NULL, +CREATE TABLE deleted_snd_chunk_replicas ( + deleted_snd_chunk_replica_id INTEGER PRIMARY KEY, + user_id INTEGER NOT NULL REFERENCES users ON DELETE CASCADE, + xftp_server_id INTEGER NOT NULL REFERENCES xftp_servers ON DELETE CASCADE, + replica_id BLOB NOT NULL, + replica_key BLOB NOT NULL, + delay INTEGER, + retries INTEGER NOT NULL DEFAULT 0, created_at TEXT NOT NULL DEFAULT (datetime('now')), updated_at TEXT NOT NULL DEFAULT (datetime('now')) ); -CREATE INDEX idx_snd_file_descriptions_snd_file_id ON snd_file_descriptions(snd_file_id); +CREATE INDEX idx_deleted_snd_chunk_replicas_user_id ON deleted_snd_chunk_replicas(user_id); +CREATE INDEX idx_deleted_snd_chunk_replicas_xftp_server_id ON deleted_snd_chunk_replicas(xftp_server_id); +|] + +down_m20230401_snd_files :: Query +down_m20230401_snd_files = + [sql| +DROP INDEX idx_deleted_snd_chunk_replicas_xftp_server_id; +DROP INDEX idx_deleted_snd_chunk_replicas_user_id; +DROP TABLE deleted_snd_chunk_replicas; + +DROP INDEX idx_snd_file_chunk_replica_recipients_snd_file_chunk_replica_id; +DROP TABLE snd_file_chunk_replica_recipients; + +DROP INDEX idx_snd_file_chunk_replicas_snd_file_chunk_id; +DROP INDEX idx_snd_file_chunk_replicas_xftp_server_id; +DROP TABLE snd_file_chunk_replicas; + +DROP INDEX idx_snd_file_chunks_snd_file_id; +DROP TABLE snd_file_chunks; + +DROP INDEX idx_snd_files_user_id; +DROP TABLE snd_files; |] diff --git a/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/agent_schema.sql b/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/agent_schema.sql index ee8cf9ff2..f2fa34218 100644 --- a/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/agent_schema.sql +++ b/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/agent_schema.sql @@ -345,3 +345,78 @@ CREATE INDEX idx_rcv_file_chunk_replicas_rcv_file_chunk_id ON rcv_file_chunk_rep CREATE INDEX idx_rcv_file_chunk_replicas_xftp_server_id ON rcv_file_chunk_replicas( xftp_server_id ); +CREATE TABLE snd_files( + snd_file_id INTEGER PRIMARY KEY, + snd_file_entity_id BLOB NOT NULL, + user_id INTEGER NOT NULL REFERENCES users ON DELETE CASCADE, + num_recipients INTEGER NOT NULL, + digest BLOB, + key BLOB NOT NUll, + nonce BLOB NOT NUll, + path TEXT NOT NULL, + prefix_path TEXT, + status TEXT NOT NULL, + deleted INTEGER NOT NULL DEFAULT 0, + error TEXT, + created_at TEXT NOT NULL DEFAULT(datetime('now')), + updated_at TEXT NOT NULL DEFAULT(datetime('now')) +); +CREATE INDEX idx_snd_files_user_id ON snd_files(user_id); +CREATE TABLE snd_file_chunks( + snd_file_chunk_id INTEGER PRIMARY KEY, + snd_file_id INTEGER NOT NULL REFERENCES snd_files ON DELETE CASCADE, + chunk_no INTEGER NOT NULL, + chunk_offset INTEGER NOT NULL, + chunk_size INTEGER NOT NULL, + digest BLOB, + created_at TEXT NOT NULL DEFAULT(datetime('now')), + updated_at TEXT NOT NULL DEFAULT(datetime('now')) +); +CREATE INDEX idx_snd_file_chunks_snd_file_id ON snd_file_chunks(snd_file_id); +CREATE TABLE snd_file_chunk_replicas( + snd_file_chunk_replica_id INTEGER PRIMARY KEY, + snd_file_chunk_id INTEGER NOT NULL REFERENCES snd_file_chunks ON DELETE CASCADE, + replica_number INTEGER NOT NULL, + xftp_server_id INTEGER NOT NULL REFERENCES xftp_servers ON DELETE CASCADE, + replica_id BLOB NOT NULL, + replica_key BLOB NOT NULL, + replica_status TEXT NOT NULL, + delay INTEGER, + retries INTEGER NOT NULL DEFAULT 0, + created_at TEXT NOT NULL DEFAULT(datetime('now')), + updated_at TEXT NOT NULL DEFAULT(datetime('now')) +); +CREATE INDEX idx_snd_file_chunk_replicas_snd_file_chunk_id ON snd_file_chunk_replicas( + snd_file_chunk_id +); +CREATE INDEX idx_snd_file_chunk_replicas_xftp_server_id ON snd_file_chunk_replicas( + xftp_server_id +); +CREATE TABLE snd_file_chunk_replica_recipients( + snd_file_chunk_replica_recipient_id INTEGER PRIMARY KEY, + snd_file_chunk_replica_id INTEGER NOT NULL REFERENCES snd_file_chunk_replicas ON DELETE CASCADE, + rcv_replica_id BLOB NOT NULL, + rcv_replica_key BLOB NOT NULL, + created_at TEXT NOT NULL DEFAULT(datetime('now')), + updated_at TEXT NOT NULL DEFAULT(datetime('now')) +); +CREATE INDEX idx_snd_file_chunk_replica_recipients_snd_file_chunk_replica_id ON snd_file_chunk_replica_recipients( + snd_file_chunk_replica_id +); +CREATE TABLE deleted_snd_chunk_replicas( + deleted_snd_chunk_replica_id INTEGER PRIMARY KEY, + user_id INTEGER NOT NULL REFERENCES users ON DELETE CASCADE, + xftp_server_id INTEGER NOT NULL REFERENCES xftp_servers ON DELETE CASCADE, + replica_id BLOB NOT NULL, + replica_key BLOB NOT NULL, + delay INTEGER, + retries INTEGER NOT NULL DEFAULT 0, + created_at TEXT NOT NULL DEFAULT(datetime('now')), + updated_at TEXT NOT NULL DEFAULT(datetime('now')) +); +CREATE INDEX idx_deleted_snd_chunk_replicas_user_id ON deleted_snd_chunk_replicas( + user_id +); +CREATE INDEX idx_deleted_snd_chunk_replicas_xftp_server_id ON deleted_snd_chunk_replicas( + xftp_server_id +); diff --git a/tests/AgentTests/FunctionalAPITests.hs b/tests/AgentTests/FunctionalAPITests.hs index 47f423bbd..ea3bc918a 100644 --- a/tests/AgentTests/FunctionalAPITests.hs +++ b/tests/AgentTests/FunctionalAPITests.hs @@ -556,7 +556,7 @@ testSuspendingAgent = do 5 <- sendMessage a bId SMP.noMsgFlags "hello 2" get a ##> ("", bId, SENT 5) Nothing <- 100000 `timeout` get b - activateAgent b + foregroundAgent b get b =##> \case ("", c, Msg "hello 2") -> c == aId; _ -> False testSuspendingAgentCompleteSending :: ATransport -> IO () diff --git a/tests/XFTPAgent.hs b/tests/XFTPAgent.hs index fcc942f16..f57c88413 100644 --- a/tests/XFTPAgent.hs +++ b/tests/XFTPAgent.hs @@ -7,21 +7,24 @@ module XFTPAgent where import AgentTests.FunctionalAPITests (get, getSMPAgentClient', rfGet, runRight, runRight_, sfGet) +import Control.Concurrent (threadDelay) import Control.Logger.Simple import Control.Monad.Except import Data.Bifunctor (first) import qualified Data.ByteString.Char8 as B import Data.Int (Int64) +import Data.List (find, isSuffixOf) +import Data.Maybe (fromJust) import SMPAgentClient (agentCfg, initAgentServers, testDB) import Simplex.FileTransfer.Description import Simplex.FileTransfer.Protocol (FileParty (..), XFTPErrorType (AUTH)) import Simplex.FileTransfer.Server.Env (XFTPServerConfig (..)) -import Simplex.Messaging.Agent (AgentClient, disconnectAgentClient, testProtocolServer, xftpDeleteRcvFile, xftpReceiveFile, xftpSendFile, xftpStartWorkers) +import Simplex.Messaging.Agent (AgentClient, disconnectAgentClient, testProtocolServer, xftpDeleteRcvFile, xftpDeleteSndFileInternal, xftpDeleteSndFileRemote, xftpReceiveFile, xftpSendFile, xftpStartWorkers) import Simplex.Messaging.Agent.Client (ProtocolTestFailure (..), ProtocolTestStep (..)) -import Simplex.Messaging.Agent.Protocol (ACommand (..), AgentErrorType (..), BrokerErrorType (..), noAuthSrv) +import Simplex.Messaging.Agent.Protocol (ACommand (..), AgentErrorType (..), BrokerErrorType (..), RcvFileId, SndFileId, noAuthSrv) import Simplex.Messaging.Encoding.String (StrEncoding (..)) import Simplex.Messaging.Protocol (BasicAuth, ProtoServerWithAuth (..), ProtocolServer (..), XFTPServerWithAuth) -import System.Directory (doesDirectoryExist, getFileSize, listDirectory) +import System.Directory (doesDirectoryExist, doesFileExist, getFileSize, listDirectory) import System.FilePath (()) import System.Timeout (timeout) import Test.Hspec @@ -30,10 +33,13 @@ import XFTPClient xftpAgentTests :: Spec xftpAgentTests = around_ testBracket . describe "Functional API" $ do - it "should receive file" testXFTPAgentReceive + it "should send and receive file" testXFTPAgentSendReceive it "should resume receiving file after restart" testXFTPAgentReceiveRestore - it "should cleanup tmp path after permanent error" testXFTPAgentReceiveCleanup - it "should send file using experimental api" testXFTPAgentSendExperimental + it "should cleanup rcv tmp path after permanent error" testXFTPAgentReceiveCleanup + it "should resume sending file after restart" testXFTPAgentSendRestore + it "should cleanup snd prefix path after permanent error" testXFTPAgentSendCleanup + it "should delete sent file on server" testXFTPAgentDelete + it "should request additional recipient IDs when number of recipients exceeds maximum per request" testXFTPAgentRequestAdditionalRecipientIDs describe "XFTP server test via agent API" $ do it "should pass without basic auth" $ testXFTPServerTest Nothing (noAuthSrv testXFTPServer2) `shouldReturn` Nothing let srv1 = testXFTPServer2 {keyHash = "1234"} @@ -70,36 +76,54 @@ checkProgress (prev, expected) (progress, total) loop | progress < total = loop progress | otherwise = pure () -testXFTPAgentReceive :: IO () -testXFTPAgentReceive = withXFTPServer $ do - -- send file using CLI +testXFTPAgentSendReceive :: IO () +testXFTPAgentSendReceive = withXFTPServer $ do + filePath <- createRandomFile + + -- send file, delete snd file internally + sndr <- getSMPAgentClient' agentCfg initAgentServers testDB + (rfd1, rfd2) <- runRight $ do + (sfId, _, rfd1, rfd2) <- testSend sndr filePath + xftpDeleteSndFileInternal sndr 1 sfId + pure (rfd1, rfd2) + + -- receive file, delete rcv file + testReceiveDelete rfd1 filePath + testReceiveDelete rfd2 filePath + where + testReceiveDelete rfd originalFilePath = do + rcp <- getSMPAgentClient' agentCfg initAgentServers testDB + runRight_ $ do + rfId <- testReceive rcp rfd originalFilePath + xftpDeleteRcvFile rcp 1 rfId + +createRandomFile :: IO FilePath +createRandomFile = do let filePath = senderFiles "testfile" xftpCLI ["rand", filePath, "17mb"] `shouldReturn` ["File created: " <> filePath] - file <- B.readFile filePath getFileSize filePath `shouldReturn` mb 17 - let fdRcv = filePath <> ".xftp" "rcv1.xftp" - fdSnd = filePath <> ".xftp" "snd.xftp.private" - progress : sendResult <- xftpCLI ["send", filePath, senderFiles, "-s", testXFTPServerStr, "--tmp=tests/tmp"] - progress `shouldSatisfy` uploadProgress - sendResult - `shouldBe` [ "Sender file description: " <> fdSnd, - "Pass file descriptions to the recipient(s):", - fdRcv - ] - -- receive file using agent - rcp <- getSMPAgentClient' agentCfg initAgentServers testDB - runRight_ $ do - xftpStartWorkers rcp (Just recipientFiles) - fd :: ValidFileDescription 'FRecipient <- getFileDescription fdRcv - fId <- xftpReceiveFile rcp 1 fd - rfProgress rcp $ mb 18 - ("", fId', RFDONE path) <- rfGet rcp - liftIO $ do - fId' `shouldBe` fId - B.readFile path `shouldReturn` file + pure filePath - -- delete file - xftpDeleteRcvFile rcp 1 fId +testSend :: AgentClient -> FilePath -> ExceptT AgentErrorType IO (SndFileId, ValidFileDescription 'FSender, ValidFileDescription 'FRecipient, ValidFileDescription 'FRecipient) +testSend sndr filePath = do + xftpStartWorkers sndr (Just senderFiles) + sfId <- xftpSendFile sndr 1 filePath 2 + sfProgress sndr $ mb 18 + ("", sfId', SFDONE sndDescr [rfd1, rfd2]) <- sfGet sndr + liftIO $ sfId' `shouldBe` sfId + pure (sfId, sndDescr, rfd1, rfd2) + +testReceive :: AgentClient -> ValidFileDescription 'FRecipient -> FilePath -> ExceptT AgentErrorType IO RcvFileId +testReceive rcp rfd originalFilePath = do + xftpStartWorkers rcp (Just recipientFiles) + rfId <- xftpReceiveFile rcp 1 rfd + rfProgress rcp $ mb 18 + ("", rfId', RFDONE path) <- rfGet rcp + liftIO $ do + rfId' `shouldBe` rfId + file <- B.readFile originalFilePath + B.readFile path `shouldReturn` file + pure rfId getFileDescription :: FilePath -> ExceptT AgentErrorType IO (ValidFileDescription 'FRecipient) getFileDescription path = @@ -110,30 +134,22 @@ logCfgNoLogs = LogConfig {lc_file = Nothing, lc_stderr = False} testXFTPAgentReceiveRestore :: IO () testXFTPAgentReceiveRestore = withGlobalLogging logCfgNoLogs $ do - let filePath = senderFiles "testfile" - fdRcv = filePath <> ".xftp" "rcv1.xftp" - fdSnd = filePath <> ".xftp" "snd.xftp.private" + filePath <- createRandomFile - withXFTPServerStoreLogOn $ \_ -> do - -- send file using CLI - xftpCLI ["rand", filePath, "17mb"] `shouldReturn` ["File created: " <> filePath] - getFileSize filePath `shouldReturn` mb 17 - progress : sendResult <- xftpCLI ["send", filePath, senderFiles, "-s", testXFTPServerStr, "--tmp=tests/tmp"] - progress `shouldSatisfy` uploadProgress - sendResult - `shouldBe` [ "Sender file description: " <> fdSnd, - "Pass file descriptions to the recipient(s):", - fdRcv - ] + rfd <- withXFTPServerStoreLogOn $ \_ -> do + -- send file + sndr <- getSMPAgentClient' agentCfg initAgentServers testDB + runRight $ do + (_, _, rfd, _) <- testSend sndr filePath + pure rfd - -- receive file using agent - should not succeed due to server being down + -- receive file - should not succeed with server down rcp <- getSMPAgentClient' agentCfg initAgentServers testDB - fId <- runRight $ do + rfId <- runRight $ do xftpStartWorkers rcp (Just recipientFiles) - fd :: ValidFileDescription 'FRecipient <- getFileDescription fdRcv - fId <- xftpReceiveFile rcp 1 fd + rfId <- xftpReceiveFile rcp 1 rfd liftIO $ timeout 300000 (get rcp) `shouldReturn` Nothing -- wait for worker attempt - pure fId + pure rfId disconnectAgentClient rcp [prefixDir] <- listDirectory recipientFiles @@ -141,13 +157,23 @@ testXFTPAgentReceiveRestore = withGlobalLogging logCfgNoLogs $ do doesDirectoryExist tmpPath `shouldReturn` True withXFTPServerStoreLogOn $ \_ -> do - -- receive file using agent - should succeed with server up + -- receive file - should start downloading with server up + rcp' <- getSMPAgentClient' agentCfg initAgentServers testDB + runRight_ $ xftpStartWorkers rcp' (Just recipientFiles) + ("", rfId', RFPROG _ _) <- rfGet rcp' + liftIO $ rfId' `shouldBe` rfId + disconnectAgentClient rcp' + + threadDelay 100000 + + withXFTPServerStoreLogOn $ \_ -> do + -- receive file - should continue downloading with server up rcp' <- getSMPAgentClient' agentCfg initAgentServers testDB runRight_ $ xftpStartWorkers rcp' (Just recipientFiles) rfProgress rcp' $ mb 18 - ("", fId', RFDONE path) <- rfGet rcp' + ("", rfId', RFDONE path) <- rfGet rcp' liftIO $ do - fId' `shouldBe` fId + rfId' `shouldBe` rfId file <- B.readFile filePath B.readFile path `shouldReturn` file @@ -156,30 +182,22 @@ testXFTPAgentReceiveRestore = withGlobalLogging logCfgNoLogs $ do testXFTPAgentReceiveCleanup :: IO () testXFTPAgentReceiveCleanup = withGlobalLogging logCfgNoLogs $ do - let filePath = senderFiles "testfile" - fdRcv = filePath <> ".xftp" "rcv1.xftp" - fdSnd = filePath <> ".xftp" "snd.xftp.private" + filePath <- createRandomFile - withXFTPServerStoreLogOn $ \_ -> do - -- send file using CLI - xftpCLI ["rand", filePath, "17mb"] `shouldReturn` ["File created: " <> filePath] - getFileSize filePath `shouldReturn` mb 17 - progress : sendResult <- xftpCLI ["send", filePath, senderFiles, "-s", testXFTPServerStr, "--tmp=tests/tmp"] - progress `shouldSatisfy` uploadProgress - sendResult - `shouldBe` [ "Sender file description: " <> fdSnd, - "Pass file descriptions to the recipient(s):", - fdRcv - ] + rfd <- withXFTPServerStoreLogOn $ \_ -> do + -- send file + sndr <- getSMPAgentClient' agentCfg initAgentServers testDB + runRight $ do + (_, _, rfd, _) <- testSend sndr filePath + pure rfd - -- receive file using agent - should not succeed due to server being down + -- receive file - should not succeed with server down rcp <- getSMPAgentClient' agentCfg initAgentServers testDB - fId <- runRight $ do + rfId <- runRight $ do xftpStartWorkers rcp (Just recipientFiles) - fd :: ValidFileDescription 'FRecipient <- getFileDescription fdRcv - fId <- xftpReceiveFile rcp 1 fd + rfId <- xftpReceiveFile rcp 1 rfd liftIO $ timeout 300000 (get rcp) `shouldReturn` Nothing -- wait for worker attempt - pure fId + pure rfId disconnectAgentClient rcp [prefixDir] <- listDirectory recipientFiles @@ -187,43 +205,159 @@ testXFTPAgentReceiveCleanup = withGlobalLogging logCfgNoLogs $ do doesDirectoryExist tmpPath `shouldReturn` True withXFTPServerThreadOn $ \_ -> do - -- receive file using agent - should fail with AUTH error + -- receive file - should fail with AUTH error rcp' <- getSMPAgentClient' agentCfg initAgentServers testDB runRight_ $ xftpStartWorkers rcp' (Just recipientFiles) - ("", fId', RFERR (INTERNAL "XFTP {xftpErr = AUTH}")) <- rfGet rcp' - fId' `shouldBe` fId + ("", rfId', RFERR (INTERNAL "XFTP {xftpErr = AUTH}")) <- rfGet rcp' + rfId' `shouldBe` rfId -- tmp path should be removed after permanent error doesDirectoryExist tmpPath `shouldReturn` False -testXFTPAgentSendExperimental :: IO () -testXFTPAgentSendExperimental = withXFTPServer $ do - -- create random file using cli - let filePath = senderFiles "testfile" - xftpCLI ["rand", filePath, "17mb"] `shouldReturn` ["File created: " <> filePath] - file <- B.readFile filePath - getFileSize filePath `shouldReturn` mb 17 +testXFTPAgentSendRestore :: IO () +testXFTPAgentSendRestore = withGlobalLogging logCfgNoLogs $ do + filePath <- createRandomFile - -- send file using experimental agent API + -- send file - should not succeed with server down sndr <- getSMPAgentClient' agentCfg initAgentServers testDB - rfd <- runRight $ do + sfId <- runRight $ do xftpStartWorkers sndr (Just senderFiles) sfId <- xftpSendFile sndr 1 filePath 2 - sfProgress sndr $ mb 18 - ("", sfId', SFDONE _sndDescr [rfd1, _rfd2]) <- sfGet sndr - liftIO $ sfId' `shouldBe` sfId - pure rfd1 + liftIO $ timeout 1000000 (get sndr) `shouldReturn` Nothing -- wait for worker to encrypt and attempt to create file + pure sfId + disconnectAgentClient sndr - -- receive file using agent - rcp <- getSMPAgentClient' agentCfg initAgentServers testDB - runRight_ $ do - xftpStartWorkers rcp (Just recipientFiles) - rfId <- xftpReceiveFile rcp 1 rfd - rfProgress rcp $ mb 18 - ("", rfId', RFDONE path) <- rfGet rcp + dirEntries <- listDirectory senderFiles + let prefixDir = fromJust $ find (isSuffixOf "_snd.xftp") dirEntries + prefixPath = senderFiles prefixDir + encPath = prefixPath "xftp.encrypted" + doesDirectoryExist prefixPath `shouldReturn` True + doesFileExist encPath `shouldReturn` True + + withXFTPServerStoreLogOn $ \_ -> do + -- send file - should start uploading with server up + sndr' <- getSMPAgentClient' agentCfg initAgentServers testDB + runRight_ $ xftpStartWorkers sndr' (Just senderFiles) + ("", sfId', SFPROG _ _) <- sfGet sndr' + liftIO $ sfId' `shouldBe` sfId + disconnectAgentClient sndr' + + threadDelay 100000 + + withXFTPServerStoreLogOn $ \_ -> do + -- send file - should continue uploading with server up + sndr' <- getSMPAgentClient' agentCfg initAgentServers testDB + runRight_ $ xftpStartWorkers sndr' (Just senderFiles) + sfProgress sndr' $ mb 18 + ("", sfId', SFDONE _sndDescr [rfd1, _rfd2]) <- sfGet sndr' + liftIO $ sfId' `shouldBe` sfId + + -- prefix path should be removed after sending file + doesDirectoryExist prefixPath `shouldReturn` False + doesFileExist encPath `shouldReturn` False + + -- receive file + rcp <- getSMPAgentClient' agentCfg initAgentServers testDB + runRight_ $ + void $ testReceive rcp rfd1 filePath + +testXFTPAgentSendCleanup :: IO () +testXFTPAgentSendCleanup = withGlobalLogging logCfgNoLogs $ do + filePath <- createRandomFile + + sfId <- withXFTPServerStoreLogOn $ \_ -> do + -- send file + sndr <- getSMPAgentClient' agentCfg initAgentServers testDB + sfId <- runRight $ do + xftpStartWorkers sndr (Just senderFiles) + sfId <- xftpSendFile sndr 1 filePath 2 + -- wait for progress events for 5 out of 6 chunks - at this point all chunks should be created on the server + forM_ [1 .. 5 :: Integer] $ \_ -> do + (_, _, SFPROG _ _) <- sfGet sndr + pure () + pure sfId + disconnectAgentClient sndr + pure sfId + + dirEntries <- listDirectory senderFiles + let prefixDir = fromJust $ find (isSuffixOf "_snd.xftp") dirEntries + prefixPath = senderFiles prefixDir + encPath = prefixPath "xftp.encrypted" + doesDirectoryExist prefixPath `shouldReturn` True + doesFileExist encPath `shouldReturn` True + + withXFTPServerThreadOn $ \_ -> do + -- send file - should fail with AUTH error + sndr' <- getSMPAgentClient' agentCfg initAgentServers testDB + runRight_ $ xftpStartWorkers sndr' (Just senderFiles) + ("", sfId', SFERR (INTERNAL "XFTP {xftpErr = AUTH}")) <- sfGet sndr' + sfId' `shouldBe` sfId + + -- prefix path should be removed after permanent error + doesDirectoryExist prefixPath `shouldReturn` False + doesFileExist encPath `shouldReturn` False + +testXFTPAgentDelete :: IO () +testXFTPAgentDelete = withGlobalLogging logCfgNoLogs $ + withXFTPServer $ do + filePath <- createRandomFile + + -- send file + sndr <- getSMPAgentClient' agentCfg initAgentServers testDB + (sfId, sndDescr, rfd1, rfd2) <- runRight $ testSend sndr filePath + + -- receive file + rcp1 <- getSMPAgentClient' agentCfg initAgentServers testDB + runRight_ $ + void $ testReceive rcp1 rfd1 filePath + + length <$> listDirectory xftpServerFiles `shouldReturn` 6 + + -- delete file + runRight $ do + xftpStartWorkers sndr (Just senderFiles) + xftpDeleteSndFileRemote sndr 1 sfId sndDescr + Nothing <- liftIO $ 100000 `timeout` sfGet sndr + pure () + + threadDelay 1000000 + length <$> listDirectory xftpServerFiles `shouldReturn` 0 + + -- receive file - should fail with AUTH error + rcp2 <- getSMPAgentClient' agentCfg initAgentServers testDB + runRight $ do + xftpStartWorkers rcp2 (Just recipientFiles) + rfId <- xftpReceiveFile rcp2 1 rfd2 + ("", rfId', RFERR (INTERNAL "XFTP {xftpErr = AUTH}")) <- rfGet rcp2 + liftIO $ rfId' `shouldBe` rfId + +testXFTPAgentRequestAdditionalRecipientIDs :: IO () +testXFTPAgentRequestAdditionalRecipientIDs = withXFTPServer $ do + filePath <- createRandomFile + + -- send file + sndr <- getSMPAgentClient' agentCfg initAgentServers testDB + rfds <- runRight $ do + xftpStartWorkers sndr (Just senderFiles) + sfId <- xftpSendFile sndr 1 filePath 500 + sfProgress sndr $ mb 18 + ("", sfId', SFDONE _sndDescr rfds) <- sfGet sndr liftIO $ do - rfId' `shouldBe` rfId - B.readFile path `shouldReturn` file + sfId' `shouldBe` sfId + length rfds `shouldBe` 500 + pure rfds + + -- receive file using different descriptions + -- ! revise number of recipients and indexes if xftpMaxRecipientsPerRequest is changed + testReceive' (head rfds) filePath + testReceive' (rfds !! 99) filePath + testReceive' (rfds !! 299) filePath + testReceive' (rfds !! 499) filePath + where + testReceive' rfd originalFilePath = do + rcp <- getSMPAgentClient' agentCfg initAgentServers testDB + runRight_ $ + void $ testReceive rcp rfd originalFilePath testXFTPServerTest :: Maybe BasicAuth -> XFTPServerWithAuth -> IO (Maybe ProtocolTestFailure) testXFTPServerTest newFileBasicAuth srv =