agent: batch db operations for deleting connections and xftp files (#1009)

* agent: batch db operations for deleting connections

* batch delete rcv files

* snd files

* refactor

* refactor2

* lines

* refactor

* fix prefix path

* refactor

---------

Co-authored-by: Evgeny Poberezkin <evgeny@poberezkin.com>
This commit is contained in:
spaced4ndy
2024-02-26 14:28:50 +04:00
committed by GitHub
parent e2ec737c68
commit 050a921fbb
2 changed files with 89 additions and 41 deletions
+54 -28
View File
@@ -7,7 +7,6 @@
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TupleSections #-}
{-# LANGUAGE TypeApplications #-}
{-# OPTIONS_GHC -fno-warn-ambiguous-fields #-}
@@ -18,11 +17,14 @@ module Simplex.FileTransfer.Agent
-- Receiving files
xftpReceiveFile',
xftpDeleteRcvFile',
xftpDeleteRcvFiles',
-- Sending files
xftpSendFile',
xftpSendDescription',
deleteSndFileInternal,
deleteSndFilesInternal,
deleteSndFileRemote,
deleteSndFilesRemote,
)
where
@@ -30,14 +32,18 @@ import Control.Logger.Simple (logError)
import Control.Monad
import Control.Monad.Except
import Control.Monad.Reader
import Data.Bifunctor (first)
import qualified Data.ByteString.Char8 as B
import qualified Data.ByteString.Lazy.Char8 as LB
import Data.Composition ((.:))
import Data.Either (rights)
import Data.Int (Int64)
import Data.List (foldl', sortOn)
import Data.List (foldl', partition, sortOn)
import qualified Data.List.NonEmpty as L
import Data.Map (Map)
import qualified Data.Map.Strict as M
import Data.Maybe (mapMaybe)
import qualified Data.Set as S
import Data.Text (Text)
import Data.Time.Clock (getCurrentTime)
import Data.Time.Format (defaultTimeLocale, formatTime)
@@ -55,6 +61,7 @@ import Simplex.Messaging.Agent.Env.SQLite
import Simplex.Messaging.Agent.Protocol
import Simplex.Messaging.Agent.RetryInterval
import Simplex.Messaging.Agent.Store.SQLite
import qualified Simplex.Messaging.Agent.Store.SQLite.DB as DB
import qualified Simplex.Messaging.Crypto as C
import Simplex.Messaging.Crypto.File (CryptoFile (..), CryptoFileArgs)
import qualified Simplex.Messaging.Crypto.File as CF
@@ -62,7 +69,7 @@ import qualified Simplex.Messaging.Crypto.Lazy as LC
import Simplex.Messaging.Encoding
import Simplex.Messaging.Encoding.String (strDecode, strEncode)
import Simplex.Messaging.Protocol (EntityId, XFTPServer)
import Simplex.Messaging.Util (liftError, tshow, unlessM, whenM)
import Simplex.Messaging.Util (catchAll_, liftError, tshow, unlessM, whenM)
import System.FilePath (takeFileName, (</>))
import UnliftIO
import UnliftIO.Directory
@@ -287,17 +294,22 @@ runXFTPRcvLocalWorker c Worker {doWork} = do
throwError $ INTERNAL "no chunk path"
xftpDeleteRcvFile' :: AgentMonad m => AgentClient -> RcvFileId -> m ()
xftpDeleteRcvFile' c rcvFileEntityId = do
rcvFile@RcvFile {rcvFileId} <- withStore c $ \db -> getRcvFileByEntityId db rcvFileEntityId
handleError (const $ pure ()) $ withStore' c (`getRcvFileRedirects` rcvFileId) >>= mapM_ remove
remove rcvFile
xftpDeleteRcvFile' c rcvFileEntityId = xftpDeleteRcvFiles' c [rcvFileEntityId]
xftpDeleteRcvFiles' :: forall m. AgentMonad m => AgentClient -> [RcvFileId] -> m ()
xftpDeleteRcvFiles' c rcvFileEntityIds = do
rcvFiles <- rights <$> withStoreBatch c (\db -> map (fmap (first storeError) . getRcvFileByEntityId db) rcvFileEntityIds)
redirects <- rights <$> batchFiles getRcvFileRedirects rcvFiles
let (toDelete, toMarkDeleted) = partition fileComplete $ concat redirects <> rcvFiles
void $ batchFiles deleteRcvFile' toDelete
void $ batchFiles updateRcvFileDeleted toMarkDeleted
workPath <- getXFTPWorkPath
liftIO . forM_ toDelete $ \RcvFile {prefixPath} ->
(removePath . (workPath </>)) prefixPath `catchAll_` pure ()
where
remove RcvFile {rcvFileId, prefixPath, status} =
if status == RFSComplete || status == RFSError
then do
removePath prefixPath
withStore' c (`deleteRcvFile'` rcvFileId)
else withStore' c (`updateRcvFileDeleted` rcvFileId)
fileComplete RcvFile {status} = status == RFSComplete || status == RFSError
batchFiles :: (DB.Connection -> DBRcvFileId -> IO a) -> [RcvFile] -> m [Either AgentErrorType a]
batchFiles f rcvFiles = withStoreBatch' c $ \db -> map (\RcvFile {rcvFileId} -> f db rcvFileId) rcvFiles
notify :: forall m e. (MonadUnliftIO m, AEntityI e) => AgentClient -> EntityId -> ACommand 'Agent e -> m ()
notify c entId cmd = atomically $ writeTBQueue (subQ c) ("", entId, APC (sAEntity @e) cmd)
@@ -546,24 +558,38 @@ runXFTPSndWorker c srv Worker {doWork} = do
any (\SndFileChunkReplica {replicaStatus} -> replicaStatus == SFRSUploaded) replicas
deleteSndFileInternal :: AgentMonad m => AgentClient -> SndFileId -> m ()
deleteSndFileInternal c sndFileEntityId = do
SndFile {sndFileId, prefixPath, status} <- withStore c $ \db -> getSndFileByEntityId db sndFileEntityId
if status == SFSComplete || status == SFSError
then do
forM_ prefixPath $ removePath <=< toFSFilePath
withStore' c (`deleteSndFile'` sndFileId)
else withStore' c (`updateSndFileDeleted` sndFileId)
deleteSndFileInternal c sndFileEntityId = deleteSndFilesInternal c [sndFileEntityId]
deleteSndFilesInternal :: forall m. AgentMonad m => AgentClient -> [SndFileId] -> m ()
deleteSndFilesInternal c sndFileEntityIds = do
sndFiles <- rights <$> withStoreBatch c (\db -> map (fmap (first storeError) . getSndFileByEntityId db) sndFileEntityIds)
let (toDelete, toMarkDeleted) = partition fileComplete sndFiles
workPath <- getXFTPWorkPath
liftIO . forM_ toDelete $ \SndFile {prefixPath} ->
mapM_ (removePath . (workPath </>)) prefixPath `catchAll_` pure ()
batchFiles_ deleteSndFile' toDelete
batchFiles_ updateSndFileDeleted toMarkDeleted
where
fileComplete SndFile {status} = status == SFSComplete || status == SFSError
batchFiles_ :: (DB.Connection -> DBSndFileId -> IO a) -> [SndFile] -> m ()
batchFiles_ f sndFiles = void $ withStoreBatch' c $ \db -> map (\SndFile {sndFileId} -> f db sndFileId) sndFiles
deleteSndFileRemote :: forall m. AgentMonad m => AgentClient -> UserId -> SndFileId -> ValidFileDescription 'FSender -> m ()
deleteSndFileRemote c userId sndFileEntityId (ValidFileDescription FileDescription {chunks}) = do
deleteSndFileInternal c sndFileEntityId `catchAgentError` (notify c sndFileEntityId . SFERR)
forM_ chunks $ \ch -> deleteFileChunk ch `catchAgentError` (notify c sndFileEntityId . SFERR)
deleteSndFileRemote c userId sndFileEntityId sfd = deleteSndFilesRemote c userId [(sndFileEntityId, sfd)]
deleteSndFilesRemote :: forall m. AgentMonad m => AgentClient -> UserId -> [(SndFileId, ValidFileDescription 'FSender)] -> m ()
deleteSndFilesRemote c userId sndFileIdsDescrs = do
deleteSndFilesInternal c (map fst sndFileIdsDescrs) `catchAgentError` (notify c "" . SFERR)
let rs = concatMap (mapMaybe chunkReplica . fdChunks . snd) sndFileIdsDescrs
void $ withStoreBatch' c (\db -> map (uncurry $ createDeletedSndChunkReplica db userId) rs)
let servers = S.fromList $ map (\(FileChunkReplica {server}, _) -> server) rs
mapM_ (getXFTPDelWorker True c) servers
where
deleteFileChunk :: FileChunk -> m ()
deleteFileChunk FileChunk {digest, replicas = replica@FileChunkReplica {server} : _} = do
withStore' c $ \db -> createDeletedSndChunkReplica db userId replica digest
void $ getXFTPDelWorker True c server
deleteFileChunk _ = pure ()
fdChunks (ValidFileDescription FileDescription {chunks}) = chunks
chunkReplica :: FileChunk -> Maybe (FileChunkReplica, FileDigest)
chunkReplica = \case
FileChunk {digest, replicas = replica : _} -> Just (replica, digest)
_ -> Nothing
resumeXFTPDelWork :: AgentMonad' m => AgentClient -> XFTPServer -> m ()
resumeXFTPDelWork = void .: getXFTPDelWorker False
+35 -13
View File
@@ -92,10 +92,13 @@ module Simplex.Messaging.Agent
xftpStartWorkers,
xftpReceiveFile,
xftpDeleteRcvFile,
xftpDeleteRcvFiles,
xftpSendFile,
xftpSendDescription,
xftpDeleteSndFileInternal,
xftpDeleteSndFilesInternal,
xftpDeleteSndFileRemote,
xftpDeleteSndFilesRemote,
rcNewHostPairing,
rcConnectHost,
rcConnectCtrl,
@@ -123,7 +126,7 @@ import Data.Bifunctor (bimap, first, second)
import Data.ByteString.Char8 (ByteString)
import qualified Data.ByteString.Char8 as B
import Data.Composition ((.:), (.:.), (.::), (.::.))
import Data.Either (rights)
import Data.Either (isRight, rights)
import Data.Foldable (foldl', toList)
import Data.Functor (($>))
import Data.Functor.Identity
@@ -138,7 +141,7 @@ import qualified Data.Text as T
import Data.Time.Clock
import Data.Time.Clock.System (systemToUTCTime)
import Data.Word (Word16)
import Simplex.FileTransfer.Agent (closeXFTPAgent, deleteSndFileInternal, deleteSndFileRemote, startXFTPWorkers, toFSFilePath, xftpDeleteRcvFile', xftpReceiveFile', xftpSendDescription', xftpSendFile')
import Simplex.FileTransfer.Agent (closeXFTPAgent, deleteSndFileInternal, deleteSndFilesInternal, deleteSndFileRemote, deleteSndFilesRemote, startXFTPWorkers, toFSFilePath, xftpDeleteRcvFile', xftpDeleteRcvFiles', xftpReceiveFile', xftpSendDescription', xftpSendFile')
import Simplex.FileTransfer.Description (ValidFileDescription)
import Simplex.FileTransfer.Protocol (FileParty (..))
import Simplex.FileTransfer.Util (removePath)
@@ -165,8 +168,8 @@ import Simplex.Messaging.Parsers (parse)
import Simplex.Messaging.Protocol (BrokerMsg, EntityId, ErrorType (AUTH), MsgBody, MsgFlags (..), NtfServer, ProtoServerWithAuth, ProtocolTypeI (..), SMPMsgMeta, SProtocolType (..), SndPublicAuthKey, SubscriptionMode (..), UserProtocol, XFTPServerWithAuth)
import qualified Simplex.Messaging.Protocol as SMP
import Simplex.Messaging.ServiceScheme (ServiceScheme (..))
import Simplex.Messaging.Transport (THandleParams (sessionId))
import qualified Simplex.Messaging.TMap as TM
import Simplex.Messaging.Transport (THandleParams (sessionId))
import Simplex.Messaging.Util
import Simplex.Messaging.Version
import Simplex.RemoteControl.Client
@@ -242,7 +245,7 @@ switchConnectionAsync c = withAgentEnv c .: switchConnectionAsync' c
deleteConnectionAsync :: AgentErrorMonad m => AgentClient -> ConnId -> m ()
deleteConnectionAsync c = withAgentEnv c . deleteConnectionAsync' c
-- -- | Delete SMP agent connections using batch commands asynchronously, no synchronous response
-- | Delete SMP agent connections using batch commands asynchronously, no synchronous response
deleteConnectionsAsync :: AgentErrorMonad m => AgentClient -> [ConnId] -> m ()
deleteConnectionsAsync c = withAgentEnv c . deleteConnectionsAsync' c
@@ -400,6 +403,10 @@ xftpReceiveFile c = withAgentEnv c .:. xftpReceiveFile' c
xftpDeleteRcvFile :: AgentErrorMonad m => AgentClient -> RcvFileId -> m ()
xftpDeleteRcvFile c = withAgentEnv c . xftpDeleteRcvFile' c
-- | Delete multiple rcv files, batching operations when possible (deletes work files from file system and db records)
xftpDeleteRcvFiles :: AgentErrorMonad m => AgentClient -> [RcvFileId] -> m ()
xftpDeleteRcvFiles c = withAgentEnv c . xftpDeleteRcvFiles' c
-- | Send XFTP file
xftpSendFile :: AgentErrorMonad m => AgentClient -> UserId -> CryptoFile -> Int -> m SndFileId
xftpSendFile c = withAgentEnv c .:. xftpSendFile' c
@@ -412,10 +419,18 @@ xftpSendDescription c = withAgentEnv c .:. xftpSendDescription' c
xftpDeleteSndFileInternal :: AgentErrorMonad m => AgentClient -> SndFileId -> m ()
xftpDeleteSndFileInternal c = withAgentEnv c . deleteSndFileInternal c
-- | Delete multiple snd files internally, batching operations when possible (deletes work files from file system and db records)
xftpDeleteSndFilesInternal :: AgentErrorMonad m => AgentClient -> [SndFileId] -> m ()
xftpDeleteSndFilesInternal c = withAgentEnv c . deleteSndFilesInternal c
-- | Delete XFTP snd file chunks on servers
xftpDeleteSndFileRemote :: AgentErrorMonad m => AgentClient -> UserId -> SndFileId -> ValidFileDescription 'FSender -> m ()
xftpDeleteSndFileRemote c = withAgentEnv c .:. deleteSndFileRemote c
-- | Delete XFTP snd file chunks on servers for multiple snd files, batching operations when possible
xftpDeleteSndFilesRemote :: AgentErrorMonad m => AgentClient -> UserId -> [(SndFileId, ValidFileDescription 'FSender)] -> m ()
xftpDeleteSndFilesRemote c = withAgentEnv c .: deleteSndFilesRemote c
-- | Create new remote host pairing
rcNewHostPairing :: AgentErrorMonad m => AgentClient -> m RCHostPairing
rcNewHostPairing c = withAgentEnv c $ liftIO . newRCHostPairing =<< asks random
@@ -1468,21 +1483,28 @@ prepareDeleteConnections_ getConnections c connIds = do
deleteConnQueues :: forall m. AgentMonad m => AgentClient -> Bool -> [RcvQueue] -> m (Map ConnId (Either AgentErrorType ()))
deleteConnQueues c ntf rqs = do
rs <- connResults <$> (deleteQueueRecs =<< deleteQueues c rqs)
forM_ (M.assocs rs) $ \case
(connId, Right _) -> withStore' c (`deleteConn` connId) >> notify ("", connId, APC SAEConn DEL_CONN)
_ -> pure ()
let connIds = M.keys $ M.filter isRight rs
rs' <- rights <$> withStoreBatch' c (\db -> map (\cId -> deleteConn db cId $> cId) connIds)
forM_ rs' $ \cId -> notify ("", cId, APC SAEConn DEL_CONN)
pure rs
where
deleteQueueRecs :: [(RcvQueue, Either AgentErrorType ())] -> m [(RcvQueue, Either AgentErrorType ())]
deleteQueueRecs rs = do
maxErrs <- asks $ deleteErrorCount . config
forM rs $ \(rq, r) -> do
r' <- case r of
Right _ -> withStore' c (`deleteConnRcvQueue` rq) >> notifyRQ rq Nothing $> r
(rs', notifyActions) <- unzip . rights <$> withStoreBatch' c (\db -> map (deleteQueueRec db maxErrs) rs)
mapM_ sequence_ notifyActions
pure rs'
where
deleteQueueRec ::
DB.Connection ->
Int ->
(RcvQueue, Either AgentErrorType ()) ->
IO ((RcvQueue, Either AgentErrorType ()), Maybe (m ()))
deleteQueueRec db maxErrs (rq, r) = case r of
Right _ -> deleteConnRcvQueue db rq $> ((rq, r), Just (notifyRQ rq Nothing))
Left e
| temporaryOrHostError e && deleteErrors rq + 1 < maxErrs -> withStore' c (`incRcvDeleteErrors` rq) $> r
| otherwise -> withStore' c (`deleteConnRcvQueue` rq) >> notifyRQ rq (Just e) $> Right ()
pure (rq, r')
| temporaryOrHostError e && deleteErrors rq + 1 < maxErrs -> incRcvDeleteErrors db rq $> ((rq, r), Nothing)
| otherwise -> deleteConnRcvQueue db rq $> ((rq, Right ()), Just (notifyRQ rq (Just e)))
notifyRQ rq e_ = notify ("", qConnId rq, APC SAEConn $ DEL_RCVQ (qServer rq) (queueId rq) e_)
notify = when ntf . atomically . writeTBQueue (subQ c)
connResults :: [(RcvQueue, Either AgentErrorType ())] -> Map ConnId (Either AgentErrorType ())