From 050a921fbbdf21690cab7765bf6237fdc5a419cb Mon Sep 17 00:00:00 2001 From: spaced4ndy <8711996+spaced4ndy@users.noreply.github.com> Date: Mon, 26 Feb 2024 14:28:50 +0400 Subject: [PATCH] agent: batch db operations for deleting connections and xftp files (#1009) * agent: batch db operations for deleting connections * batch delete rcv files * snd files * refactor * refactor2 * lines * refactor * fix prefix path * refactor --------- Co-authored-by: Evgeny Poberezkin --- src/Simplex/FileTransfer/Agent.hs | 82 ++++++++++++++++++++----------- src/Simplex/Messaging/Agent.hs | 48 +++++++++++++----- 2 files changed, 89 insertions(+), 41 deletions(-) diff --git a/src/Simplex/FileTransfer/Agent.hs b/src/Simplex/FileTransfer/Agent.hs index 24bb9c789..9a789a104 100644 --- a/src/Simplex/FileTransfer/Agent.hs +++ b/src/Simplex/FileTransfer/Agent.hs @@ -7,7 +7,6 @@ {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE RankNTypes #-} {-# LANGUAGE ScopedTypeVariables #-} -{-# LANGUAGE TupleSections #-} {-# LANGUAGE TypeApplications #-} {-# OPTIONS_GHC -fno-warn-ambiguous-fields #-} @@ -18,11 +17,14 @@ module Simplex.FileTransfer.Agent -- Receiving files xftpReceiveFile', xftpDeleteRcvFile', + xftpDeleteRcvFiles', -- Sending files xftpSendFile', xftpSendDescription', deleteSndFileInternal, + deleteSndFilesInternal, deleteSndFileRemote, + deleteSndFilesRemote, ) where @@ -30,14 +32,18 @@ import Control.Logger.Simple (logError) import Control.Monad import Control.Monad.Except import Control.Monad.Reader +import Data.Bifunctor (first) import qualified Data.ByteString.Char8 as B import qualified Data.ByteString.Lazy.Char8 as LB import Data.Composition ((.:)) +import Data.Either (rights) import Data.Int (Int64) -import Data.List (foldl', sortOn) +import Data.List (foldl', partition, sortOn) import qualified Data.List.NonEmpty as L import Data.Map (Map) import qualified Data.Map.Strict as M +import Data.Maybe (mapMaybe) +import qualified Data.Set as S import Data.Text (Text) import Data.Time.Clock (getCurrentTime) import Data.Time.Format (defaultTimeLocale, formatTime) @@ -55,6 +61,7 @@ import Simplex.Messaging.Agent.Env.SQLite import Simplex.Messaging.Agent.Protocol import Simplex.Messaging.Agent.RetryInterval import Simplex.Messaging.Agent.Store.SQLite +import qualified Simplex.Messaging.Agent.Store.SQLite.DB as DB import qualified Simplex.Messaging.Crypto as C import Simplex.Messaging.Crypto.File (CryptoFile (..), CryptoFileArgs) import qualified Simplex.Messaging.Crypto.File as CF @@ -62,7 +69,7 @@ import qualified Simplex.Messaging.Crypto.Lazy as LC import Simplex.Messaging.Encoding import Simplex.Messaging.Encoding.String (strDecode, strEncode) import Simplex.Messaging.Protocol (EntityId, XFTPServer) -import Simplex.Messaging.Util (liftError, tshow, unlessM, whenM) +import Simplex.Messaging.Util (catchAll_, liftError, tshow, unlessM, whenM) import System.FilePath (takeFileName, ()) import UnliftIO import UnliftIO.Directory @@ -287,17 +294,22 @@ runXFTPRcvLocalWorker c Worker {doWork} = do throwError $ INTERNAL "no chunk path" xftpDeleteRcvFile' :: AgentMonad m => AgentClient -> RcvFileId -> m () -xftpDeleteRcvFile' c rcvFileEntityId = do - rcvFile@RcvFile {rcvFileId} <- withStore c $ \db -> getRcvFileByEntityId db rcvFileEntityId - handleError (const $ pure ()) $ withStore' c (`getRcvFileRedirects` rcvFileId) >>= mapM_ remove - remove rcvFile +xftpDeleteRcvFile' c rcvFileEntityId = xftpDeleteRcvFiles' c [rcvFileEntityId] + +xftpDeleteRcvFiles' :: forall m. AgentMonad m => AgentClient -> [RcvFileId] -> m () +xftpDeleteRcvFiles' c rcvFileEntityIds = do + rcvFiles <- rights <$> withStoreBatch c (\db -> map (fmap (first storeError) . getRcvFileByEntityId db) rcvFileEntityIds) + redirects <- rights <$> batchFiles getRcvFileRedirects rcvFiles + let (toDelete, toMarkDeleted) = partition fileComplete $ concat redirects <> rcvFiles + void $ batchFiles deleteRcvFile' toDelete + void $ batchFiles updateRcvFileDeleted toMarkDeleted + workPath <- getXFTPWorkPath + liftIO . forM_ toDelete $ \RcvFile {prefixPath} -> + (removePath . (workPath )) prefixPath `catchAll_` pure () where - remove RcvFile {rcvFileId, prefixPath, status} = - if status == RFSComplete || status == RFSError - then do - removePath prefixPath - withStore' c (`deleteRcvFile'` rcvFileId) - else withStore' c (`updateRcvFileDeleted` rcvFileId) + fileComplete RcvFile {status} = status == RFSComplete || status == RFSError + batchFiles :: (DB.Connection -> DBRcvFileId -> IO a) -> [RcvFile] -> m [Either AgentErrorType a] + batchFiles f rcvFiles = withStoreBatch' c $ \db -> map (\RcvFile {rcvFileId} -> f db rcvFileId) rcvFiles notify :: forall m e. (MonadUnliftIO m, AEntityI e) => AgentClient -> EntityId -> ACommand 'Agent e -> m () notify c entId cmd = atomically $ writeTBQueue (subQ c) ("", entId, APC (sAEntity @e) cmd) @@ -546,24 +558,38 @@ runXFTPSndWorker c srv Worker {doWork} = do any (\SndFileChunkReplica {replicaStatus} -> replicaStatus == SFRSUploaded) replicas deleteSndFileInternal :: AgentMonad m => AgentClient -> SndFileId -> m () -deleteSndFileInternal c sndFileEntityId = do - SndFile {sndFileId, prefixPath, status} <- withStore c $ \db -> getSndFileByEntityId db sndFileEntityId - if status == SFSComplete || status == SFSError - then do - forM_ prefixPath $ removePath <=< toFSFilePath - withStore' c (`deleteSndFile'` sndFileId) - else withStore' c (`updateSndFileDeleted` sndFileId) +deleteSndFileInternal c sndFileEntityId = deleteSndFilesInternal c [sndFileEntityId] + +deleteSndFilesInternal :: forall m. AgentMonad m => AgentClient -> [SndFileId] -> m () +deleteSndFilesInternal c sndFileEntityIds = do + sndFiles <- rights <$> withStoreBatch c (\db -> map (fmap (first storeError) . getSndFileByEntityId db) sndFileEntityIds) + let (toDelete, toMarkDeleted) = partition fileComplete sndFiles + workPath <- getXFTPWorkPath + liftIO . forM_ toDelete $ \SndFile {prefixPath} -> + mapM_ (removePath . (workPath )) prefixPath `catchAll_` pure () + batchFiles_ deleteSndFile' toDelete + batchFiles_ updateSndFileDeleted toMarkDeleted + where + fileComplete SndFile {status} = status == SFSComplete || status == SFSError + batchFiles_ :: (DB.Connection -> DBSndFileId -> IO a) -> [SndFile] -> m () + batchFiles_ f sndFiles = void $ withStoreBatch' c $ \db -> map (\SndFile {sndFileId} -> f db sndFileId) sndFiles deleteSndFileRemote :: forall m. AgentMonad m => AgentClient -> UserId -> SndFileId -> ValidFileDescription 'FSender -> m () -deleteSndFileRemote c userId sndFileEntityId (ValidFileDescription FileDescription {chunks}) = do - deleteSndFileInternal c sndFileEntityId `catchAgentError` (notify c sndFileEntityId . SFERR) - forM_ chunks $ \ch -> deleteFileChunk ch `catchAgentError` (notify c sndFileEntityId . SFERR) +deleteSndFileRemote c userId sndFileEntityId sfd = deleteSndFilesRemote c userId [(sndFileEntityId, sfd)] + +deleteSndFilesRemote :: forall m. AgentMonad m => AgentClient -> UserId -> [(SndFileId, ValidFileDescription 'FSender)] -> m () +deleteSndFilesRemote c userId sndFileIdsDescrs = do + deleteSndFilesInternal c (map fst sndFileIdsDescrs) `catchAgentError` (notify c "" . SFERR) + let rs = concatMap (mapMaybe chunkReplica . fdChunks . snd) sndFileIdsDescrs + void $ withStoreBatch' c (\db -> map (uncurry $ createDeletedSndChunkReplica db userId) rs) + let servers = S.fromList $ map (\(FileChunkReplica {server}, _) -> server) rs + mapM_ (getXFTPDelWorker True c) servers where - deleteFileChunk :: FileChunk -> m () - deleteFileChunk FileChunk {digest, replicas = replica@FileChunkReplica {server} : _} = do - withStore' c $ \db -> createDeletedSndChunkReplica db userId replica digest - void $ getXFTPDelWorker True c server - deleteFileChunk _ = pure () + fdChunks (ValidFileDescription FileDescription {chunks}) = chunks + chunkReplica :: FileChunk -> Maybe (FileChunkReplica, FileDigest) + chunkReplica = \case + FileChunk {digest, replicas = replica : _} -> Just (replica, digest) + _ -> Nothing resumeXFTPDelWork :: AgentMonad' m => AgentClient -> XFTPServer -> m () resumeXFTPDelWork = void .: getXFTPDelWorker False diff --git a/src/Simplex/Messaging/Agent.hs b/src/Simplex/Messaging/Agent.hs index d6d1e9800..aa9872e9c 100644 --- a/src/Simplex/Messaging/Agent.hs +++ b/src/Simplex/Messaging/Agent.hs @@ -92,10 +92,13 @@ module Simplex.Messaging.Agent xftpStartWorkers, xftpReceiveFile, xftpDeleteRcvFile, + xftpDeleteRcvFiles, xftpSendFile, xftpSendDescription, xftpDeleteSndFileInternal, + xftpDeleteSndFilesInternal, xftpDeleteSndFileRemote, + xftpDeleteSndFilesRemote, rcNewHostPairing, rcConnectHost, rcConnectCtrl, @@ -123,7 +126,7 @@ import Data.Bifunctor (bimap, first, second) import Data.ByteString.Char8 (ByteString) import qualified Data.ByteString.Char8 as B import Data.Composition ((.:), (.:.), (.::), (.::.)) -import Data.Either (rights) +import Data.Either (isRight, rights) import Data.Foldable (foldl', toList) import Data.Functor (($>)) import Data.Functor.Identity @@ -138,7 +141,7 @@ import qualified Data.Text as T import Data.Time.Clock import Data.Time.Clock.System (systemToUTCTime) import Data.Word (Word16) -import Simplex.FileTransfer.Agent (closeXFTPAgent, deleteSndFileInternal, deleteSndFileRemote, startXFTPWorkers, toFSFilePath, xftpDeleteRcvFile', xftpReceiveFile', xftpSendDescription', xftpSendFile') +import Simplex.FileTransfer.Agent (closeXFTPAgent, deleteSndFileInternal, deleteSndFilesInternal, deleteSndFileRemote, deleteSndFilesRemote, startXFTPWorkers, toFSFilePath, xftpDeleteRcvFile', xftpDeleteRcvFiles', xftpReceiveFile', xftpSendDescription', xftpSendFile') import Simplex.FileTransfer.Description (ValidFileDescription) import Simplex.FileTransfer.Protocol (FileParty (..)) import Simplex.FileTransfer.Util (removePath) @@ -165,8 +168,8 @@ import Simplex.Messaging.Parsers (parse) import Simplex.Messaging.Protocol (BrokerMsg, EntityId, ErrorType (AUTH), MsgBody, MsgFlags (..), NtfServer, ProtoServerWithAuth, ProtocolTypeI (..), SMPMsgMeta, SProtocolType (..), SndPublicAuthKey, SubscriptionMode (..), UserProtocol, XFTPServerWithAuth) import qualified Simplex.Messaging.Protocol as SMP import Simplex.Messaging.ServiceScheme (ServiceScheme (..)) -import Simplex.Messaging.Transport (THandleParams (sessionId)) import qualified Simplex.Messaging.TMap as TM +import Simplex.Messaging.Transport (THandleParams (sessionId)) import Simplex.Messaging.Util import Simplex.Messaging.Version import Simplex.RemoteControl.Client @@ -242,7 +245,7 @@ switchConnectionAsync c = withAgentEnv c .: switchConnectionAsync' c deleteConnectionAsync :: AgentErrorMonad m => AgentClient -> ConnId -> m () deleteConnectionAsync c = withAgentEnv c . deleteConnectionAsync' c --- -- | Delete SMP agent connections using batch commands asynchronously, no synchronous response +-- | Delete SMP agent connections using batch commands asynchronously, no synchronous response deleteConnectionsAsync :: AgentErrorMonad m => AgentClient -> [ConnId] -> m () deleteConnectionsAsync c = withAgentEnv c . deleteConnectionsAsync' c @@ -400,6 +403,10 @@ xftpReceiveFile c = withAgentEnv c .:. xftpReceiveFile' c xftpDeleteRcvFile :: AgentErrorMonad m => AgentClient -> RcvFileId -> m () xftpDeleteRcvFile c = withAgentEnv c . xftpDeleteRcvFile' c +-- | Delete multiple rcv files, batching operations when possible (deletes work files from file system and db records) +xftpDeleteRcvFiles :: AgentErrorMonad m => AgentClient -> [RcvFileId] -> m () +xftpDeleteRcvFiles c = withAgentEnv c . xftpDeleteRcvFiles' c + -- | Send XFTP file xftpSendFile :: AgentErrorMonad m => AgentClient -> UserId -> CryptoFile -> Int -> m SndFileId xftpSendFile c = withAgentEnv c .:. xftpSendFile' c @@ -412,10 +419,18 @@ xftpSendDescription c = withAgentEnv c .:. xftpSendDescription' c xftpDeleteSndFileInternal :: AgentErrorMonad m => AgentClient -> SndFileId -> m () xftpDeleteSndFileInternal c = withAgentEnv c . deleteSndFileInternal c +-- | Delete multiple snd files internally, batching operations when possible (deletes work files from file system and db records) +xftpDeleteSndFilesInternal :: AgentErrorMonad m => AgentClient -> [SndFileId] -> m () +xftpDeleteSndFilesInternal c = withAgentEnv c . deleteSndFilesInternal c + -- | Delete XFTP snd file chunks on servers xftpDeleteSndFileRemote :: AgentErrorMonad m => AgentClient -> UserId -> SndFileId -> ValidFileDescription 'FSender -> m () xftpDeleteSndFileRemote c = withAgentEnv c .:. deleteSndFileRemote c +-- | Delete XFTP snd file chunks on servers for multiple snd files, batching operations when possible +xftpDeleteSndFilesRemote :: AgentErrorMonad m => AgentClient -> UserId -> [(SndFileId, ValidFileDescription 'FSender)] -> m () +xftpDeleteSndFilesRemote c = withAgentEnv c .: deleteSndFilesRemote c + -- | Create new remote host pairing rcNewHostPairing :: AgentErrorMonad m => AgentClient -> m RCHostPairing rcNewHostPairing c = withAgentEnv c $ liftIO . newRCHostPairing =<< asks random @@ -1468,21 +1483,28 @@ prepareDeleteConnections_ getConnections c connIds = do deleteConnQueues :: forall m. AgentMonad m => AgentClient -> Bool -> [RcvQueue] -> m (Map ConnId (Either AgentErrorType ())) deleteConnQueues c ntf rqs = do rs <- connResults <$> (deleteQueueRecs =<< deleteQueues c rqs) - forM_ (M.assocs rs) $ \case - (connId, Right _) -> withStore' c (`deleteConn` connId) >> notify ("", connId, APC SAEConn DEL_CONN) - _ -> pure () + let connIds = M.keys $ M.filter isRight rs + rs' <- rights <$> withStoreBatch' c (\db -> map (\cId -> deleteConn db cId $> cId) connIds) + forM_ rs' $ \cId -> notify ("", cId, APC SAEConn DEL_CONN) pure rs where deleteQueueRecs :: [(RcvQueue, Either AgentErrorType ())] -> m [(RcvQueue, Either AgentErrorType ())] deleteQueueRecs rs = do maxErrs <- asks $ deleteErrorCount . config - forM rs $ \(rq, r) -> do - r' <- case r of - Right _ -> withStore' c (`deleteConnRcvQueue` rq) >> notifyRQ rq Nothing $> r + (rs', notifyActions) <- unzip . rights <$> withStoreBatch' c (\db -> map (deleteQueueRec db maxErrs) rs) + mapM_ sequence_ notifyActions + pure rs' + where + deleteQueueRec :: + DB.Connection -> + Int -> + (RcvQueue, Either AgentErrorType ()) -> + IO ((RcvQueue, Either AgentErrorType ()), Maybe (m ())) + deleteQueueRec db maxErrs (rq, r) = case r of + Right _ -> deleteConnRcvQueue db rq $> ((rq, r), Just (notifyRQ rq Nothing)) Left e - | temporaryOrHostError e && deleteErrors rq + 1 < maxErrs -> withStore' c (`incRcvDeleteErrors` rq) $> r - | otherwise -> withStore' c (`deleteConnRcvQueue` rq) >> notifyRQ rq (Just e) $> Right () - pure (rq, r') + | temporaryOrHostError e && deleteErrors rq + 1 < maxErrs -> incRcvDeleteErrors db rq $> ((rq, r), Nothing) + | otherwise -> deleteConnRcvQueue db rq $> ((rq, Right ()), Just (notifyRQ rq (Just e))) notifyRQ rq e_ = notify ("", qConnId rq, APC SAEConn $ DEL_RCVQ (qServer rq) (queueId rq) e_) notify = when ntf . atomically . writeTBQueue (subQ c) connResults :: [(RcvQueue, Either AgentErrorType ())] -> Map ConnId (Either AgentErrorType ())