From 135bdf38422ea2840f3f6abaa28235d57ac6771f Mon Sep 17 00:00:00 2001 From: JRoberts <8711996+jr-simplex@users.noreply.github.com> Date: Wed, 5 Oct 2022 19:54:28 +0400 Subject: [PATCH] core: optimize bulk chat item deletion 2 (#1172) --- simplex-chat.cabal | 1 + src/Simplex/Chat.hs | 62 ++++--- src/Simplex/Chat/Messages.hs | 2 +- ...M20221004_idx_msg_deliveries_message_id.hs | 12 ++ src/Simplex/Chat/Migrations/chat_schema.sql | 1 + src/Simplex/Chat/Store.hs | 153 ++++++------------ tests/ChatTests.hs | 5 +- 7 files changed, 115 insertions(+), 121 deletions(-) create mode 100644 src/Simplex/Chat/Migrations/M20221004_idx_msg_deliveries_message_id.hs diff --git a/simplex-chat.cabal b/simplex-chat.cabal index 615a4a84cd..f128c81c0f 100644 --- a/simplex-chat.cabal +++ b/simplex-chat.cabal @@ -53,6 +53,7 @@ library Simplex.Chat.Migrations.M20220928_settings Simplex.Chat.Migrations.M20221001_shared_msg_id_indices Simplex.Chat.Migrations.M20221003_delete_broken_integrity_error_chat_items + Simplex.Chat.Migrations.M20221004_idx_msg_deliveries_message_id Simplex.Chat.Mobile Simplex.Chat.Options Simplex.Chat.ProfileGenerator diff --git a/src/Simplex/Chat.hs b/src/Simplex/Chat.hs index afc7dcc77a..7179aea83a 100644 --- a/src/Simplex/Chat.hs +++ b/src/Simplex/Chat.hs @@ -39,7 +39,7 @@ import qualified Data.Map.Strict as M import Data.Maybe (fromMaybe, isJust, isNothing, mapMaybe) import Data.Text (Text) import qualified Data.Text as T -import Data.Time (addUTCTime) +import Data.Time (NominalDiffTime, addUTCTime) import Data.Time.Clock (UTCTime, diffUTCTime, getCurrentTime, nominalDiffTimeToSeconds) import Data.Time.Clock.System (SystemTime, systemToUTCTime) import Data.Time.LocalTime (getCurrentTimeZone, getZonedTime) @@ -449,7 +449,7 @@ processChatCommand = \case deleteCIFile :: MsgDirectionI d => User -> Maybe (CIFile d) -> m () deleteCIFile user file = forM_ file $ \CIFile {fileId, filePath, fileStatus} -> do - let fileInfo = CIFileInfo {fileId, fileStatus = AFS msgDirection fileStatus, filePath} + let fileInfo = CIFileInfo {fileId, fileStatus = Just $ AFS msgDirection fileStatus, filePath} deleteFile user fileInfo APIChatRead (ChatRef cType chatId) fromToIds -> withChatLock $ case cType of CTDirect -> withStore' (\db -> updateDirectChatItemsRead db chatId fromToIds) $> CRCmdOk @@ -1123,17 +1123,18 @@ setExpireCIs b = do atomically $ writeTVar expire b deleteFile :: forall m. ChatMonad m => User -> CIFileInfo -> m () -deleteFile user CIFileInfo {filePath, fileId, fileStatus = (AFS dir status)} = +deleteFile user CIFileInfo {filePath, fileId, fileStatus} = cancel' >> delete where - cancel' = unless (ciFileEnded status) $ - case dir of - SMDSnd -> do - (ftm@FileTransferMeta {cancelled}, fts) <- withStore (\db -> getSndFileTransfer db user fileId) - unless cancelled $ cancelSndFile user ftm fts - SMDRcv -> do - ft@RcvFileTransfer {cancelled} <- withStore (\db -> getRcvFileTransfer db user fileId) - unless cancelled $ cancelRcvFileTransfer user ft + cancel' = forM_ fileStatus $ \(AFS dir status) -> + unless (ciFileEnded status) $ + case dir of + SMDSnd -> do + (ftm@FileTransferMeta {cancelled}, fts) <- withStore (\db -> getSndFileTransfer db user fileId) + unless cancelled $ cancelSndFile user ftm fts + SMDRcv -> do + ft@RcvFileTransfer {cancelled} <- withStore (\db -> getRcvFileTransfer db user fileId) + unless cancelled $ cancelRcvFileTransfer user ft delete = withFilesFolder $ \filesFolder -> forM_ filePath $ \fPath -> do let fsFilePath = filesFolder <> "/" <> fPath @@ -1400,15 +1401,40 @@ expireChatItems :: forall m. ChatMonad m => User -> Int64 -> Bool -> m () expireChatItems user ttl sync = do currentTs <- liftIO getCurrentTime let expirationDate = addUTCTime (-1 * fromIntegral ttl) currentTs + -- this is to keep group messages created during last 12 hours even if they're expired according to item_ts + createdAtCutoff = addUTCTime (-43200 :: NominalDiffTime) currentTs expire <- asks expireCIs - filesInfo <- withStore' $ \db -> getExpiredFileInfo db user expirationDate - loop filesInfo expirationDate expire + contacts <- withStore' (`getUserContacts` user) + contactsLoop contacts expirationDate expire + groups <- withStore' (`getUserGroupDetails` user) + groupsLoop groups expirationDate createdAtCutoff expire where - loop :: [CIFileInfo] -> UTCTime -> TVar Bool -> m () - loop [] expirationDate expire = continue expire $ withStore' (\db -> deleteExpiredCIs db user expirationDate) - loop (fileInfo : filesInfo) expirationDate expire = continue expire $ do - deleteFile user fileInfo - loop filesInfo expirationDate expire + contactsLoop :: [Contact] -> UTCTime -> TVar Bool -> m () + contactsLoop [] _ _ = pure () + contactsLoop (ct : cts) expirationDate expire = continue expire $ do + filesInfo <- withStore' $ \db -> getContactExpiredFileInfo db user ct expirationDate + maxItemTs_ <- withStore' $ \db -> getContactMaxItemTs db user ct + forM_ filesInfo $ \fileInfo -> deleteFile user fileInfo + withStore' $ \db -> deleteContactExpiredCIs db user ct expirationDate + withStore' $ \db -> do + ciCount_ <- getContactCICount db user ct + case (maxItemTs_, ciCount_) of + (Just ts, Just count) -> when (count == 0) $ updateContactTs db user ct ts + _ -> pure () + contactsLoop cts expirationDate expire + groupsLoop :: [GroupInfo] -> UTCTime -> UTCTime -> TVar Bool -> m () + groupsLoop [] _ _ _ = pure () + groupsLoop (gInfo : gInfos) expirationDate createdAtCutoff expire = continue expire $ do + filesInfo <- withStore' $ \db -> getGroupExpiredFileInfo db user gInfo expirationDate createdAtCutoff + maxItemTs_ <- withStore' $ \db -> getGroupMaxItemTs db user gInfo + forM_ filesInfo $ \fileInfo -> deleteFile user fileInfo + withStore' $ \db -> deleteGroupExpiredCIs db user gInfo expirationDate createdAtCutoff + withStore' $ \db -> do + ciCount_ <- getGroupCICount db user gInfo + case (maxItemTs_, ciCount_) of + (Just ts, Just count) -> when (count == 0) $ updateGroupTs db user gInfo ts + _ -> pure () + groupsLoop gInfos expirationDate createdAtCutoff expire continue :: TVar Bool -> m () -> m () continue expire = if sync then id else \a -> whenM (readTVarIO expire) $ threadDelay 100000 >> a diff --git a/src/Simplex/Chat/Messages.hs b/src/Simplex/Chat/Messages.hs index 614bada622..665e23f25f 100644 --- a/src/Simplex/Chat/Messages.hs +++ b/src/Simplex/Chat/Messages.hs @@ -395,7 +395,7 @@ instance StrEncoding ACIFileStatus where -- to conveniently read file data from db data CIFileInfo = CIFileInfo { fileId :: Int64, - fileStatus :: ACIFileStatus, + fileStatus :: Maybe ACIFileStatus, filePath :: Maybe FilePath } deriving (Show) diff --git a/src/Simplex/Chat/Migrations/M20221004_idx_msg_deliveries_message_id.hs b/src/Simplex/Chat/Migrations/M20221004_idx_msg_deliveries_message_id.hs new file mode 100644 index 0000000000..0e53923b58 --- /dev/null +++ b/src/Simplex/Chat/Migrations/M20221004_idx_msg_deliveries_message_id.hs @@ -0,0 +1,12 @@ +{-# LANGUAGE QuasiQuotes #-} + +module Simplex.Chat.Migrations.M20221004_idx_msg_deliveries_message_id where + +import Database.SQLite.Simple (Query) +import Database.SQLite.Simple.QQ (sql) + +m20221004_idx_msg_deliveries_message_id :: Query +m20221004_idx_msg_deliveries_message_id = + [sql| +CREATE INDEX idx_msg_deliveries_message_id ON msg_deliveries(message_id); +|] diff --git a/src/Simplex/Chat/Migrations/chat_schema.sql b/src/Simplex/Chat/Migrations/chat_schema.sql index 427c10a223..ba9366b586 100644 --- a/src/Simplex/Chat/Migrations/chat_schema.sql +++ b/src/Simplex/Chat/Migrations/chat_schema.sql @@ -426,3 +426,4 @@ CREATE UNIQUE INDEX idx_chat_items_group_shared_msg_id ON chat_items( group_member_id, shared_msg_id ); +CREATE INDEX idx_msg_deliveries_message_id ON msg_deliveries(message_id); diff --git a/src/Simplex/Chat/Store.hs b/src/Simplex/Chat/Store.hs index 462e836c81..d88c5740d2 100644 --- a/src/Simplex/Chat/Store.hs +++ b/src/Simplex/Chat/Store.hs @@ -194,11 +194,12 @@ module Simplex.Chat.Store getXGrpMemIntroContGroup, getChatItemTTL, setChatItemTTL, - getExpiredFileInfo, - deleteExpiredCIs, - getChatsWithExpiredItems, - getContactExpiredCIs, - getGroupExpiredCIs, + getContactExpiredFileInfo, + deleteContactExpiredCIs, + getContactCICount, + getGroupExpiredFileInfo, + deleteGroupExpiredCIs, + getGroupCICount, getPendingContactConnection, deletePendingContactConnection, updateContactSettings, @@ -224,7 +225,7 @@ import Data.Functor (($>)) import Data.Int (Int64) import Data.List (find, sortBy, sortOn) import Data.List.NonEmpty (NonEmpty) -import Data.Maybe (fromMaybe, isJust, listToMaybe, mapMaybe) +import Data.Maybe (fromMaybe, isJust, listToMaybe) import Data.Ord (Down (..)) import Data.Text (Text) import qualified Data.Text as T @@ -263,6 +264,7 @@ import Simplex.Chat.Migrations.M20220926_connection_alias import Simplex.Chat.Migrations.M20220928_settings import Simplex.Chat.Migrations.M20221001_shared_msg_id_indices import Simplex.Chat.Migrations.M20221003_delete_broken_integrity_error_chat_items +import Simplex.Chat.Migrations.M20221004_idx_msg_deliveries_message_id import Simplex.Chat.Protocol import Simplex.Chat.Types import Simplex.Messaging.Agent.Protocol (ACorrId, AgentMsgId, ConnId, InvitationId, MsgMeta (..)) @@ -301,7 +303,8 @@ schemaMigrations = ("20220926_connection_alias", m20220926_connection_alias), ("20220928_settings", m20220928_settings), ("20221001_shared_msg_id_indices", m20221001_shared_msg_id_indices), - ("20221003_delete_broken_integrity_error_chat_items", m20221003_delete_broken_integrity_error_chat_items) + ("20221003_delete_broken_integrity_error_chat_items", m20221003_delete_broken_integrity_error_chat_items), + ("20221004_idx_msg_deliveries_message_id", m20221004_idx_msg_deliveries_message_id) ] -- | The list of migrations in ascending order by date @@ -2456,7 +2459,7 @@ getContactFileInfo db User {userId} Contact {contactId} = |] (userId, contactId) -toFileInfo :: (Int64, ACIFileStatus, Maybe FilePath) -> CIFileInfo +toFileInfo :: (Int64, Maybe ACIFileStatus, Maybe FilePath) -> CIFileInfo toFileInfo (fileId, fileStatus, filePath) = CIFileInfo {fileId, fileStatus, filePath} getContactMaxItemTs :: DB.Connection -> User -> Contact -> IO (Maybe UTCTime) @@ -2465,21 +2468,16 @@ getContactMaxItemTs db User {userId} Contact {contactId} = DB.query db "SELECT MAX(item_ts) FROM chat_items WHERE user_id = ? AND contact_id = ?" (userId, contactId) deleteContactCIs :: DB.Connection -> User -> Contact -> IO () -deleteContactCIs db User {userId} Contact {contactId} = do - deleteContactCIsMessages_ +deleteContactCIs db user@User {userId} ct@Contact {contactId} = do + connIds <- getContactConnIds_ db user ct + forM_ connIds $ \connId -> + DB.execute db "DELETE FROM messages WHERE connection_id = ?" (Only connId) DB.execute db "DELETE FROM chat_items WHERE user_id = ? AND contact_id = ?" (userId, contactId) - where - deleteContactCIsMessages_ = - DB.execute - db - [sql| - DELETE FROM messages WHERE message_id IN ( - SELECT message_id FROM chat_item_messages WHERE chat_item_id IN ( - SELECT chat_item_id FROM chat_items WHERE user_id = ? AND contact_id = ? - ) - ) - |] - (userId, contactId) + +getContactConnIds_ :: DB.Connection -> User -> Contact -> IO [Int64] +getContactConnIds_ db User {userId} Contact {contactId} = + map fromOnly + <$> DB.query db "SELECT connection_id FROM connections WHERE user_id = ? AND contact_id = ?" (userId, contactId) updateContactTs :: DB.Connection -> User -> Contact -> UTCTime -> IO () updateContactTs db User {userId} Contact {contactId} updatedAt = @@ -2508,20 +2506,8 @@ getGroupMaxItemTs db User {userId} GroupInfo {groupId} = deleteGroupCIs :: DB.Connection -> User -> GroupInfo -> IO () deleteGroupCIs db User {userId} GroupInfo {groupId} = do - deleteGroupCIsMessages_ + DB.execute db "DELETE FROM messages WHERE group_id = ?" (Only groupId) DB.execute db "DELETE FROM chat_items WHERE user_id = ? AND group_id = ?" (userId, groupId) - where - deleteGroupCIsMessages_ = - DB.execute - db - [sql| - DELETE FROM messages WHERE message_id IN ( - SELECT message_id FROM chat_item_messages WHERE chat_item_id IN ( - SELECT chat_item_id FROM chat_items WHERE user_id = ? AND group_id = ? - ) - ) - |] - (userId, groupId) updateGroupTs :: DB.Connection -> User -> GroupInfo -> UTCTime -> IO () updateGroupTs db User {userId} GroupInfo {groupId} updatedAt = @@ -4107,8 +4093,8 @@ setChatItemTTL db User {userId} chatItemTTL = do "INSERT INTO settings (user_id, chat_item_ttl, created_at, updated_at) VALUES (?,?,?,?)" (userId, chatItemTTL, currentTs, currentTs) -getExpiredFileInfo :: DB.Connection -> User -> UTCTime -> IO [CIFileInfo] -getExpiredFileInfo db User {userId} expirationDate = +getContactExpiredFileInfo :: DB.Connection -> User -> Contact -> UTCTime -> IO [CIFileInfo] +getContactExpiredFileInfo db User {userId} Contact {contactId} expirationDate = map toFileInfo <$> DB.query db @@ -4116,79 +4102,44 @@ getExpiredFileInfo db User {userId} expirationDate = SELECT f.file_id, f.ci_file_status, f.file_path FROM chat_items i JOIN files f ON f.chat_item_id = i.chat_item_id - WHERE i.user_id = ? AND i.item_ts <= ? - |] - (userId, expirationDate) - -deleteExpiredCIs :: DB.Connection -> User -> UTCTime -> IO () -deleteExpiredCIs db User {userId} expirationDate = do - deleteExpiredCIsMessages_ - DB.execute db "DELETE FROM chat_items WHERE user_id = ? AND item_ts <= ?" (userId, expirationDate) - where - deleteExpiredCIsMessages_ = - DB.execute - db - [sql| - DELETE FROM messages WHERE message_id IN ( - SELECT message_id FROM chat_item_messages WHERE chat_item_id IN ( - SELECT chat_item_id FROM chat_items WHERE user_id = ? AND item_ts <= ? - ) - ) - |] - (userId, expirationDate) - -getChatsWithExpiredItems :: DB.Connection -> User -> UTCTime -> IO [ChatRef] -getChatsWithExpiredItems db User {userId} expirationDate = - mapMaybe toChatRef - <$> DB.query - db - [sql| - SELECT contact_id, group_id - FROM chat_items - WHERE user_id = ? AND item_ts <= ? - GROUP BY contact_id, group_id - ORDER BY contact_id ASC, group_id ASC - |] - (userId, expirationDate) - where - toChatRef :: (Maybe ContactId, Maybe GroupId) -> Maybe ChatRef - toChatRef (Just contactId, Nothing) = Just $ ChatRef CTDirect contactId - toChatRef (Nothing, Just groupId) = Just $ ChatRef CTGroup groupId - toChatRef _ = Nothing - -getContactExpiredCIs :: DB.Connection -> User -> ContactId -> UTCTime -> IO [(ChatItemId, Maybe CIFileInfo)] -getContactExpiredCIs db User {userId} contactId expirationDate = - map toItemIdAndFileInfo' - <$> DB.query - db - [sql| - SELECT i.chat_item_id, f.file_id, f.ci_file_status, f.file_path - FROM chat_items i - LEFT JOIN files f ON f.chat_item_id = i.chat_item_id - WHERE i.user_id = ? AND i.contact_id = ? AND i.item_ts <= ? - ORDER BY i.item_ts ASC + WHERE i.user_id = ? AND i.contact_id = ? AND i.created_at <= ? |] (userId, contactId, expirationDate) -getGroupExpiredCIs :: DB.Connection -> User -> Int64 -> UTCTime -> IO [(ChatItemId, Maybe CIFileInfo)] -getGroupExpiredCIs db User {userId} groupId expirationDate = - map toItemIdAndFileInfo' +deleteContactExpiredCIs :: DB.Connection -> User -> Contact -> UTCTime -> IO () +deleteContactExpiredCIs db user@User {userId} ct@Contact {contactId} expirationDate = do + connIds <- getContactConnIds_ db user ct + forM_ connIds $ \connId -> + DB.execute db "DELETE FROM messages WHERE connection_id = ? AND created_at <= ?" (connId, expirationDate) + DB.execute db "DELETE FROM chat_items WHERE user_id = ? AND contact_id = ? AND created_at <= ?" (userId, contactId, expirationDate) + +getContactCICount :: DB.Connection -> User -> Contact -> IO (Maybe Int64) +getContactCICount db User {userId} Contact {contactId} = + fmap join . maybeFirstRow fromOnly $ + DB.query db "SELECT COUNT(1) FROM chat_items WHERE user_id = ? AND contact_id = ?" (userId, contactId) + +getGroupExpiredFileInfo :: DB.Connection -> User -> GroupInfo -> UTCTime -> UTCTime -> IO [CIFileInfo] +getGroupExpiredFileInfo db User {userId} GroupInfo {groupId} expirationDate createdAtCutoff = + map toFileInfo <$> DB.query db [sql| - SELECT i.chat_item_id, f.file_id, f.ci_file_status, f.file_path + SELECT f.file_id, f.ci_file_status, f.file_path FROM chat_items i - LEFT JOIN files f ON f.chat_item_id = i.chat_item_id - WHERE i.user_id = ? AND i.group_id = ? AND i.item_ts <= ? - ORDER BY i.item_ts ASC + JOIN files f ON f.chat_item_id = i.chat_item_id + WHERE i.user_id = ? AND i.group_id = ? AND i.item_ts <= ? AND i.created_at <= ? |] - (userId, groupId, expirationDate) + (userId, groupId, expirationDate, createdAtCutoff) -toItemIdAndFileInfo' :: (ChatItemId, Maybe Int64, Maybe ACIFileStatus, Maybe FilePath) -> (ChatItemId, Maybe CIFileInfo) -toItemIdAndFileInfo' (chatItemId, fileId_, fileStatus_, filePath) = - case (fileId_, fileStatus_) of - (Just fileId, Just fileStatus) -> (chatItemId, Just CIFileInfo {fileId, fileStatus, filePath}) - _ -> (chatItemId, Nothing) +deleteGroupExpiredCIs :: DB.Connection -> User -> GroupInfo -> UTCTime -> UTCTime -> IO () +deleteGroupExpiredCIs db User {userId} GroupInfo {groupId} expirationDate createdAtCutoff = do + DB.execute db "DELETE FROM messages WHERE group_id = ? AND created_at <= ?" (groupId, min expirationDate createdAtCutoff) + DB.execute db "DELETE FROM chat_items WHERE user_id = ? AND group_id = ? AND item_ts <= ? AND created_at <= ?" (userId, groupId, expirationDate, createdAtCutoff) + +getGroupCICount :: DB.Connection -> User -> GroupInfo -> IO (Maybe Int64) +getGroupCICount db User {userId} GroupInfo {groupId} = + fmap join . maybeFirstRow fromOnly $ + DB.query db "SELECT COUNT(1) FROM chat_items WHERE user_id = ? AND group_id = ?" (userId, groupId) -- | Saves unique local display name based on passed displayName, suffixed with _N if required. -- This function should be called inside transaction. diff --git a/tests/ChatTests.hs b/tests/ChatTests.hs index 6eb78375bb..7002bf4cba 100644 --- a/tests/ChatTests.hs +++ b/tests/ChatTests.hs @@ -3225,7 +3225,10 @@ send :: TestCC -> String -> IO () send TestCC {chatController = cc} cmd = atomically $ writeTBQueue (inputQ cc) cmd (<##) :: TestCC -> String -> Expectation -cc <## line = getTermLine cc `shouldReturn` line +cc <## line = do + l <- getTermLine cc + when (l /= line) $ print ("expected: " <> line, ", got: " <> l) + l `shouldBe` line getInAnyOrder :: (String -> String) -> TestCC -> [String] -> Expectation getInAnyOrder _ _ [] = pure ()