From fd4eeb36db3851a3eac86bd4b9ff2992ea6bc079 Mon Sep 17 00:00:00 2001 From: Evgeny Poberezkin Date: Mon, 22 Jan 2024 14:04:57 +0000 Subject: [PATCH] agent: optimize expired messages query (#976) * agent: optimize expired messages query * schema * fix query * fix * typo Co-authored-by: spaced4ndy <8711996+spaced4ndy@users.noreply.github.com> * fix * refactor * comment * refactor2 --------- Co-authored-by: spaced4ndy <8711996+spaced4ndy@users.noreply.github.com> --- simplexmq.cabal | 1 + src/Simplex/Messaging/Agent/Store/SQLite.hs | 37 +++++++++++++------ .../Agent/Store/SQLite/Migrations.hs | 4 +- .../M20240121_message_delivery_indexes.hs | 20 ++++++++++ .../Store/SQLite/Migrations/agent_schema.sql | 11 ++++++ 5 files changed, 60 insertions(+), 13 deletions(-) create mode 100644 src/Simplex/Messaging/Agent/Store/SQLite/Migrations/M20240121_message_delivery_indexes.hs diff --git a/simplexmq.cabal b/simplexmq.cabal index 6dedce077..1ab3c44ac 100644 --- a/simplexmq.cabal +++ b/simplexmq.cabal @@ -101,6 +101,7 @@ library Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230829_crypto_files Simplex.Messaging.Agent.Store.SQLite.Migrations.M20231222_command_created_at Simplex.Messaging.Agent.Store.SQLite.Migrations.M20231225_failed_work_items + Simplex.Messaging.Agent.Store.SQLite.Migrations.M20240121_message_delivery_indexes Simplex.Messaging.Agent.TAsyncs Simplex.Messaging.Agent.TRcvQueues Simplex.Messaging.Client diff --git a/src/Simplex/Messaging/Agent/Store/SQLite.hs b/src/Simplex/Messaging/Agent/Store/SQLite.hs index f84c68967..06f41be9e 100644 --- a/src/Simplex/Messaging/Agent/Store/SQLite.hs +++ b/src/Simplex/Messaging/Agent/Store/SQLite.hs @@ -1043,18 +1043,31 @@ deletePendingMsgs db connId SndQueue {dbQueueId} = DB.execute db "DELETE FROM snd_message_deliveries WHERE conn_id = ? AND snd_queue_id = ?" (connId, dbQueueId) getExpiredSndMessages :: DB.Connection -> ConnId -> SndQueue -> UTCTime -> IO [InternalId] -getExpiredSndMessages db connId SndQueue {dbQueueId} expireTs = - map fromOnly - <$> DB.query - db - [sql| - SELECT d.internal_id - FROM snd_message_deliveries d - JOIN messages m ON m.conn_id = d.conn_id AND m.internal_id = d.internal_id - WHERE d.conn_id = ? AND d.snd_queue_id = ? AND d.failed = 0 AND m.internal_ts < ? - ORDER BY d.internal_id ASC - |] - (connId, dbQueueId, expireTs) +getExpiredSndMessages db connId SndQueue {dbQueueId} expireTs = do + -- type is Maybe InternalId because MAX always returns one row, possibly with NULL value + maxId :: [Maybe InternalId] <- + map fromOnly + <$> DB.query + db + [sql| + SELECT MAX(internal_id) + FROM messages + WHERE conn_id = ? AND internal_snd_id IS NOT NULL AND internal_ts < ? + |] + (connId, expireTs) + case maxId of + Just msgId : _ -> + map fromOnly + <$> DB.query + db + [sql| + SELECT internal_id + FROM snd_message_deliveries + WHERE conn_id = ? AND snd_queue_id = ? AND failed = 0 AND internal_id <= ? + ORDER BY internal_id ASC + |] + (connId, dbQueueId, msgId) + _ -> pure [] setMsgUserAck :: DB.Connection -> ConnId -> InternalId -> IO (Either StoreError (RcvQueue, SMP.MsgId)) setMsgUserAck db connId agentMsgId = runExceptT $ do diff --git a/src/Simplex/Messaging/Agent/Store/SQLite/Migrations.hs b/src/Simplex/Messaging/Agent/Store/SQLite/Migrations.hs index f41a34291..2d8ad3a8c 100644 --- a/src/Simplex/Messaging/Agent/Store/SQLite/Migrations.hs +++ b/src/Simplex/Messaging/Agent/Store/SQLite/Migrations.hs @@ -67,6 +67,7 @@ import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230814_indexes import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230829_crypto_files import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20231222_command_created_at import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20231225_failed_work_items +import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20240121_message_delivery_indexes import Simplex.Messaging.Encoding.String import Simplex.Messaging.Parsers (dropPrefix, sumTypeJSON) import Simplex.Messaging.Transport.Client (TransportHost) @@ -102,7 +103,8 @@ schemaMigrations = ("m20230814_indexes", m20230814_indexes, Just down_m20230814_indexes), ("m20230829_crypto_files", m20230829_crypto_files, Just down_m20230829_crypto_files), ("m20231222_command_created_at", m20231222_command_created_at, Just down_m20231222_command_created_at), - ("m20231225_failed_work_items", m20231225_failed_work_items, Just down_m20231225_failed_work_items) + ("m20231225_failed_work_items", m20231225_failed_work_items, Just down_m20231225_failed_work_items), + ("m20240121_message_delivery_indexes", m20240121_message_delivery_indexes, Just down_m20240121_message_delivery_indexes) ] -- | The list of migrations in ascending order by date diff --git a/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/M20240121_message_delivery_indexes.hs b/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/M20240121_message_delivery_indexes.hs new file mode 100644 index 000000000..345f3dfda --- /dev/null +++ b/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/M20240121_message_delivery_indexes.hs @@ -0,0 +1,20 @@ +{-# LANGUAGE QuasiQuotes #-} + +module Simplex.Messaging.Agent.Store.SQLite.Migrations.M20240121_message_delivery_indexes where + +import Database.SQLite.Simple (Query) +import Database.SQLite.Simple.QQ (sql) + +m20240121_message_delivery_indexes :: Query +m20240121_message_delivery_indexes = + [sql| +CREATE INDEX idx_messages_snd_expired ON messages(conn_id, internal_snd_id, internal_ts); +CREATE INDEX idx_snd_message_deliveries_expired ON snd_message_deliveries(conn_id, snd_queue_id, failed, internal_id); +|] + +down_m20240121_message_delivery_indexes :: Query +down_m20240121_message_delivery_indexes = + [sql| +DROP INDEX idx_messages_snd_expired; +DROP INDEX idx_snd_message_deliveries_expired; +|] diff --git a/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/agent_schema.sql b/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/agent_schema.sql index db80c50be..4ff419401 100644 --- a/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/agent_schema.sql +++ b/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/agent_schema.sql @@ -497,3 +497,14 @@ CREATE INDEX idx_commands_server_commands ON commands( CREATE INDEX idx_rcv_files_status_created_at ON rcv_files(status, created_at); CREATE INDEX idx_snd_files_status_created_at ON snd_files(status, created_at); CREATE INDEX idx_snd_files_snd_file_entity_id ON snd_files(snd_file_entity_id); +CREATE INDEX idx_messages_snd_expired ON messages( + conn_id, + internal_snd_id, + internal_ts +); +CREATE INDEX idx_snd_message_deliveries_expired ON snd_message_deliveries( + conn_id, + snd_queue_id, + failed, + internal_id +);