diff --git a/src/Simplex/Messaging/Server/MsgStore/Postgres.hs b/src/Simplex/Messaging/Server/MsgStore/Postgres.hs index 0c714e089..a2efb4563 100644 --- a/src/Simplex/Messaging/Server/MsgStore/Postgres.hs +++ b/src/Simplex/Messaging/Server/MsgStore/Postgres.hs @@ -106,7 +106,7 @@ instance MsgStoreClass PostgresMsgStore where expireOldMessages :: Bool -> PostgresMsgStore -> Int64 -> Int64 -> IO MessageStats expireOldMessages _tty ms now ttl = maybeFirstRow' newMessageStats toMessageStats $ withConnection st $ \db -> - DB.query db "CALL expire_old_messages(?,?,0,0,0)" (now, ttl) + DB.query db "CALL expire_old_messages(?,?,?,0,0,0)" (now, ttl, 10000 :: Int) where st = dbStore $ queueStore_ ms toMessageStats (expiredMsgsCount, storedMsgsCount, storedQueues) = diff --git a/src/Simplex/Messaging/Server/QueueStore/Postgres/Migrations.hs b/src/Simplex/Messaging/Server/QueueStore/Postgres/Migrations.hs index ae150614c..48f22cf81 100644 --- a/src/Simplex/Messaging/Server/QueueStore/Postgres/Migrations.hs +++ b/src/Simplex/Messaging/Server/QueueStore/Postgres/Migrations.hs @@ -183,6 +183,9 @@ CREATE INDEX idx_messages_recipient_id_message_id ON messages (recipient_id, mes CREATE INDEX idx_messages_recipient_id_msg_ts on messages(recipient_id, msg_ts); CREATE INDEX idx_messages_recipient_id_msg_quota on messages(recipient_id, msg_quota); +DROP INDEX idx_msg_queues_updated_at; +CREATE INDEX idx_msg_queues_updated_at_recipient_id ON msg_queues (deleted_at, updated_at, msg_queue_size, recipient_id); + CREATE FUNCTION write_message( p_recipient_id BYTEA, p_msg_id BYTEA, @@ -232,18 +235,33 @@ BEGIN WHERE recipient_id = p_recipient_id AND deleted_at IS NULL FOR UPDATE; - IF FOUND THEN - SELECT message_id, msg_id, msg_ts, msg_quota, msg_ntf_flag, msg_body INTO msg - FROM messages - WHERE recipient_id = p_recipient_id - ORDER BY message_id ASC LIMIT 1; + IF NOT FOUND THEN + RETURN; + END IF; - IF FOUND AND msg.msg_id = p_msg_id THEN - DELETE FROM messages WHERE message_id = msg.message_id; - IF FOUND THEN - CALL dec_msg_count(p_recipient_id, q_size, 1); - RETURN QUERY VALUES (msg.msg_id, msg.msg_ts, msg.msg_quota, msg.msg_ntf_flag, msg.msg_body); - END IF; + SELECT message_id, msg_id, msg_ts, msg_quota, msg_ntf_flag, msg_body + INTO msg + FROM messages + WHERE recipient_id = p_recipient_id + ORDER BY message_id ASC LIMIT 1; + + IF NOT FOUND THEN + IF q_size != 0 THEN + UPDATE msg_queues + SET msg_can_write = TRUE, msg_queue_size = 0 + WHERE recipient_id = p_recipient_id; + END IF; + RETURN; + END IF; + + IF msg.msg_id = p_msg_id THEN + DELETE FROM messages WHERE message_id = msg.message_id; + IF FOUND THEN + UPDATE msg_queues + SET msg_can_write = msg_can_write OR msg_queue_size <= 1, + msg_queue_size = GREATEST(msg_queue_size - 1, 0) + WHERE recipient_id = p_recipient_id; + RETURN QUERY VALUES (msg.msg_id, msg.msg_ts, msg.msg_quota, msg.msg_ntf_flag, msg.msg_body); END IF; END IF; END; @@ -255,37 +273,61 @@ LANGUAGE plpgsql AS $$ DECLARE q_size BIGINT; msg RECORD; + msg_deleted BOOLEAN; BEGIN SELECT msg_queue_size INTO q_size FROM msg_queues WHERE recipient_id = p_recipient_id AND deleted_at IS NULL FOR UPDATE; - IF FOUND THEN - SELECT message_id, msg_id, msg_ts, msg_quota, msg_ntf_flag, msg_body INTO msg + IF NOT FOUND THEN + RETURN; + END IF; + + SELECT message_id, msg_id, msg_ts, msg_quota, msg_ntf_flag, msg_body + INTO msg + FROM messages + WHERE recipient_id = p_recipient_id + ORDER BY message_id ASC LIMIT 1; + + IF NOT FOUND THEN + IF q_size != 0 THEN + UPDATE msg_queues + SET msg_can_write = TRUE, msg_queue_size = 0 + WHERE recipient_id = p_recipient_id; + END IF; + RETURN; + END IF; + + IF msg.msg_id = p_msg_id THEN + DELETE FROM messages WHERE message_id = msg.message_id; + + msg_deleted := FOUND; + IF msg_deleted THEN + RETURN QUERY VALUES (msg.msg_id, msg.msg_ts, msg.msg_quota, msg.msg_ntf_flag, msg.msg_body); + END IF; + + SELECT msg_id, msg_ts, msg_quota, msg_ntf_flag, msg_body + INTO msg FROM messages WHERE recipient_id = p_recipient_id ORDER BY message_id ASC LIMIT 1; IF FOUND THEN - IF msg.msg_id = p_msg_id THEN - DELETE FROM messages WHERE message_id = msg.message_id; - - IF FOUND THEN - CALL dec_msg_count(p_recipient_id, q_size, 1); - RETURN QUERY VALUES (msg.msg_id, msg.msg_ts, msg.msg_quota, msg.msg_ntf_flag, msg.msg_body); - END IF; - - RETURN QUERY ( - SELECT msg_id, msg_ts, msg_quota, msg_ntf_flag, msg_body - FROM messages - WHERE recipient_id = p_recipient_id - ORDER BY message_id ASC LIMIT 1 - ); - ELSE - RETURN QUERY VALUES (msg.msg_id, msg.msg_ts, msg.msg_quota, msg.msg_ntf_flag, msg.msg_body); + RETURN QUERY VALUES (msg.msg_id, msg.msg_ts, msg.msg_quota, msg.msg_ntf_flag, msg.msg_body); + IF msg_deleted THEN + UPDATE msg_queues + SET msg_can_write = msg_can_write OR msg_queue_size <= 1, + msg_queue_size = GREATEST(msg_queue_size - 1, 0) + WHERE recipient_id = p_recipient_id; END IF; + ELSIF msg_deleted OR q_size != 0 THEN + UPDATE msg_queues + SET msg_can_write = TRUE, msg_queue_size = 0 + WHERE recipient_id = p_recipient_id; END IF; + ELSE + RETURN QUERY VALUES (msg.msg_id, msg.msg_ts, msg.msg_quota, msg.msg_ntf_flag, msg.msg_body); END IF; END; $$; @@ -319,7 +361,10 @@ BEGIN GET DIAGNOSTICS del_count = ROW_COUNT; IF del_count > 0 THEN - CALL dec_msg_count(p_recipient_id, q_size, del_count); + UPDATE msg_queues + SET msg_can_write = msg_can_write OR msg_queue_size <= del_count, + msg_queue_size = GREATEST(msg_queue_size - del_count, 0) + WHERE recipient_id = p_recipient_id; END IF; RETURN del_count; END; @@ -328,6 +373,7 @@ $$; CREATE PROCEDURE expire_old_messages( p_now_ts BIGINT, p_ttl BIGINT, + batch_size INT, OUT r_expired_msgs_count BIGINT, OUT r_stored_msgs_count BIGINT, OUT r_stored_queues BIGINT @@ -336,42 +382,46 @@ LANGUAGE plpgsql AS $$ DECLARE old_ts BIGINT := p_now_ts - p_ttl; very_old_ts BIGINT := p_now_ts - 2 * p_ttl - 86400; + rids BYTEA[]; rid BYTEA; - min_id BIGINT; - q_size BIGINT; + last_rid BYTEA := '\x'; del_count BIGINT; total_deleted BIGINT := 0; BEGIN - FOR rid IN - SELECT recipient_id - FROM msg_queues - WHERE deleted_at IS NULL AND updated_at > very_old_ts LOOP - BEGIN -- sub-transaction for each queue - del_count := delete_expired_msgs(rid, old_ts); - total_deleted := total_deleted + del_count; - EXCEPTION WHEN OTHERS THEN - ROLLBACK; - RAISE WARNING 'STORE, expire_old_messages, error expiring queue %: %', encode(rid, 'base64'), SQLERRM; - CONTINUE; - END; - COMMIT; + SELECT array_agg(recipient_id) + INTO rids + FROM ( + SELECT recipient_id + FROM msg_queues + WHERE deleted_at IS NULL + AND updated_at > very_old_ts + AND msg_queue_size > 0 + AND recipient_id > last_rid + ORDER BY recipient_id ASC + LIMIT batch_size + ) qs; + + EXIT WHEN rids IS NULL OR cardinality(rids) = 0; + + FOREACH rid IN ARRAY rids + LOOP + BEGIN + del_count := delete_expired_msgs(rid, old_ts); + total_deleted := total_deleted + del_count; + EXCEPTION WHEN OTHERS THEN + RAISE WARNING 'STORE, expire_old_messages, error expiring queue %: %', encode(rid, 'base64'), SQLERRM; + CONTINUE; + END; + COMMIT; + END LOOP; + last_rid := rids[cardinality(rids)]; END LOOP; r_expired_msgs_count := total_deleted; r_stored_msgs_count := (SELECT COUNT(1) FROM messages); r_stored_queues := (SELECT COUNT(1) FROM msg_queues WHERE deleted_at IS NULL); END; -$$; - -CREATE PROCEDURE dec_msg_count(p_recipient_id BYTEA, p_size BIGINT, p_change BIGINT) -LANGUAGE plpgsql AS $$ -BEGIN - UPDATE msg_queues - SET msg_can_write = msg_can_write OR p_size <= p_change, - msg_queue_size = GREATEST(p_size - p_change, 0) - WHERE recipient_id = p_recipient_id; -END; $$; |] @@ -384,7 +434,9 @@ DROP FUNCTION try_del_msg; DROP FUNCTION try_del_peek_msg; DROP FUNCTION delete_expired_msgs; DROP PROCEDURE expire_old_messages; -DROP PROCEDURE dec_msg_count; + +DROP INDEX idx_msg_queues_updated_at_recipient_id; +CREATE INDEX idx_msg_queues_updated_at ON msg_queues (deleted_at, updated_at); DROP INDEX idx_messages_recipient_id_message_id; DROP INDEX idx_messages_recipient_id_msg_ts; diff --git a/src/Simplex/Messaging/Server/QueueStore/Postgres/server_schema.sql b/src/Simplex/Messaging/Server/QueueStore/Postgres/server_schema.sql index db49006e2..5a98edb0d 100644 --- a/src/Simplex/Messaging/Server/QueueStore/Postgres/server_schema.sql +++ b/src/Simplex/Messaging/Server/QueueStore/Postgres/server_schema.sql @@ -16,19 +16,6 @@ CREATE SCHEMA smp_server; -CREATE PROCEDURE smp_server.dec_msg_count(IN p_recipient_id bytea, IN p_size bigint, IN p_change bigint) - LANGUAGE plpgsql - AS $$ -BEGIN - UPDATE msg_queues - SET msg_can_write = msg_can_write OR p_size <= p_change, - msg_queue_size = GREATEST(p_size - p_change, 0) - WHERE recipient_id = p_recipient_id; -END; -$$; - - - CREATE FUNCTION smp_server.delete_expired_msgs(p_recipient_id bytea, p_old_ts bigint) RETURNS bigint LANGUAGE plpgsql AS $$ @@ -59,7 +46,10 @@ BEGIN GET DIAGNOSTICS del_count = ROW_COUNT; IF del_count > 0 THEN - CALL dec_msg_count(p_recipient_id, q_size, del_count); + UPDATE msg_queues + SET msg_can_write = msg_can_write OR msg_queue_size <= del_count, + msg_queue_size = GREATEST(msg_queue_size - del_count, 0) + WHERE recipient_id = p_recipient_id; END IF; RETURN del_count; END; @@ -67,32 +57,46 @@ $$; -CREATE PROCEDURE smp_server.expire_old_messages(IN p_now_ts bigint, IN p_ttl bigint, OUT r_expired_msgs_count bigint, OUT r_stored_msgs_count bigint, OUT r_stored_queues bigint) +CREATE PROCEDURE smp_server.expire_old_messages(IN p_now_ts bigint, IN p_ttl bigint, IN batch_size integer, OUT r_expired_msgs_count bigint, OUT r_stored_msgs_count bigint, OUT r_stored_queues bigint) LANGUAGE plpgsql AS $$ DECLARE old_ts BIGINT := p_now_ts - p_ttl; very_old_ts BIGINT := p_now_ts - 2 * p_ttl - 86400; + rids BYTEA[]; rid BYTEA; - min_id BIGINT; - q_size BIGINT; + last_rid BYTEA := '\x'; del_count BIGINT; total_deleted BIGINT := 0; BEGIN - FOR rid IN - SELECT recipient_id - FROM msg_queues - WHERE deleted_at IS NULL AND updated_at > very_old_ts LOOP - BEGIN -- sub-transaction for each queue - del_count := delete_expired_msgs(rid, old_ts); - total_deleted := total_deleted + del_count; - EXCEPTION WHEN OTHERS THEN - ROLLBACK; - RAISE WARNING 'STORE, expire_old_messages, error expiring queue %: %', encode(rid, 'base64'), SQLERRM; - CONTINUE; - END; - COMMIT; + SELECT array_agg(recipient_id) + INTO rids + FROM ( + SELECT recipient_id + FROM msg_queues + WHERE deleted_at IS NULL + AND updated_at > very_old_ts + AND msg_queue_size > 0 + AND recipient_id > last_rid + ORDER BY recipient_id ASC + LIMIT batch_size + ) qs; + + EXIT WHEN rids IS NULL OR cardinality(rids) = 0; + + FOREACH rid IN ARRAY rids + LOOP + BEGIN + del_count := delete_expired_msgs(rid, old_ts); + total_deleted := total_deleted + del_count; + EXCEPTION WHEN OTHERS THEN + RAISE WARNING 'STORE, expire_old_messages, error expiring queue %: %', encode(rid, 'base64'), SQLERRM; + CONTINUE; + END; + COMMIT; + END LOOP; + last_rid := rids[cardinality(rids)]; END LOOP; r_expired_msgs_count := total_deleted; @@ -115,18 +119,33 @@ BEGIN WHERE recipient_id = p_recipient_id AND deleted_at IS NULL FOR UPDATE; - IF FOUND THEN - SELECT message_id, msg_id, msg_ts, msg_quota, msg_ntf_flag, msg_body INTO msg - FROM messages - WHERE recipient_id = p_recipient_id - ORDER BY message_id ASC LIMIT 1; + IF NOT FOUND THEN + RETURN; + END IF; - IF FOUND AND msg.msg_id = p_msg_id THEN - DELETE FROM messages WHERE message_id = msg.message_id; - IF FOUND THEN - CALL dec_msg_count(p_recipient_id, q_size, 1); - RETURN QUERY VALUES (msg.msg_id, msg.msg_ts, msg.msg_quota, msg.msg_ntf_flag, msg.msg_body); - END IF; + SELECT message_id, msg_id, msg_ts, msg_quota, msg_ntf_flag, msg_body + INTO msg + FROM messages + WHERE recipient_id = p_recipient_id + ORDER BY message_id ASC LIMIT 1; + + IF NOT FOUND THEN + IF q_size != 0 THEN + UPDATE msg_queues + SET msg_can_write = TRUE, msg_queue_size = 0 + WHERE recipient_id = p_recipient_id; + END IF; + RETURN; + END IF; + + IF msg.msg_id = p_msg_id THEN + DELETE FROM messages WHERE message_id = msg.message_id; + IF FOUND THEN + UPDATE msg_queues + SET msg_can_write = msg_can_write OR msg_queue_size <= 1, + msg_queue_size = GREATEST(msg_queue_size - 1, 0) + WHERE recipient_id = p_recipient_id; + RETURN QUERY VALUES (msg.msg_id, msg.msg_ts, msg.msg_quota, msg.msg_ntf_flag, msg.msg_body); END IF; END IF; END; @@ -140,37 +159,61 @@ CREATE FUNCTION smp_server.try_del_peek_msg(p_recipient_id bytea, p_msg_id bytea DECLARE q_size BIGINT; msg RECORD; + msg_deleted BOOLEAN; BEGIN SELECT msg_queue_size INTO q_size FROM msg_queues WHERE recipient_id = p_recipient_id AND deleted_at IS NULL FOR UPDATE; - IF FOUND THEN - SELECT message_id, msg_id, msg_ts, msg_quota, msg_ntf_flag, msg_body INTO msg + IF NOT FOUND THEN + RETURN; + END IF; + + SELECT message_id, msg_id, msg_ts, msg_quota, msg_ntf_flag, msg_body + INTO msg + FROM messages + WHERE recipient_id = p_recipient_id + ORDER BY message_id ASC LIMIT 1; + + IF NOT FOUND THEN + IF q_size != 0 THEN + UPDATE msg_queues + SET msg_can_write = TRUE, msg_queue_size = 0 + WHERE recipient_id = p_recipient_id; + END IF; + RETURN; + END IF; + + IF msg.msg_id = p_msg_id THEN + DELETE FROM messages WHERE message_id = msg.message_id; + + msg_deleted := FOUND; + IF msg_deleted THEN + RETURN QUERY VALUES (msg.msg_id, msg.msg_ts, msg.msg_quota, msg.msg_ntf_flag, msg.msg_body); + END IF; + + SELECT msg_id, msg_ts, msg_quota, msg_ntf_flag, msg_body + INTO msg FROM messages WHERE recipient_id = p_recipient_id ORDER BY message_id ASC LIMIT 1; IF FOUND THEN - IF msg.msg_id = p_msg_id THEN - DELETE FROM messages WHERE message_id = msg.message_id; - - IF FOUND THEN - CALL dec_msg_count(p_recipient_id, q_size, 1); - RETURN QUERY VALUES (msg.msg_id, msg.msg_ts, msg.msg_quota, msg.msg_ntf_flag, msg.msg_body); - END IF; - - RETURN QUERY ( - SELECT msg_id, msg_ts, msg_quota, msg_ntf_flag, msg_body - FROM messages - WHERE recipient_id = p_recipient_id - ORDER BY message_id ASC LIMIT 1 - ); - ELSE - RETURN QUERY VALUES (msg.msg_id, msg.msg_ts, msg.msg_quota, msg.msg_ntf_flag, msg.msg_body); + RETURN QUERY VALUES (msg.msg_id, msg.msg_ts, msg.msg_quota, msg.msg_ntf_flag, msg.msg_body); + IF msg_deleted THEN + UPDATE msg_queues + SET msg_can_write = msg_can_write OR msg_queue_size <= 1, + msg_queue_size = GREATEST(msg_queue_size - 1, 0) + WHERE recipient_id = p_recipient_id; END IF; + ELSIF msg_deleted OR q_size != 0 THEN + UPDATE msg_queues + SET msg_can_write = TRUE, msg_queue_size = 0 + WHERE recipient_id = p_recipient_id; END IF; + ELSE + RETURN QUERY VALUES (msg.msg_id, msg.msg_ts, msg.msg_quota, msg.msg_ntf_flag, msg.msg_body); END IF; END; $$; @@ -332,7 +375,7 @@ CREATE UNIQUE INDEX idx_msg_queues_sender_id ON smp_server.msg_queues USING btre -CREATE INDEX idx_msg_queues_updated_at ON smp_server.msg_queues USING btree (deleted_at, updated_at); +CREATE INDEX idx_msg_queues_updated_at_recipient_id ON smp_server.msg_queues USING btree (deleted_at, updated_at, msg_queue_size, recipient_id); diff --git a/tests/CoreTests/MsgStoreTests.hs b/tests/CoreTests/MsgStoreTests.hs index dd5d22d2e..ed5095fb4 100644 --- a/tests/CoreTests/MsgStoreTests.hs +++ b/tests/CoreTests/MsgStoreTests.hs @@ -26,6 +26,7 @@ import Control.Monad.Trans.Except import Crypto.Random (ChaChaDRG) import Data.ByteString.Char8 (ByteString) import qualified Data.ByteString.Char8 as B +import Data.Int (Int64) import Data.List (isPrefixOf, isSuffixOf) import Data.Maybe (fromJust) import Data.Time.Clock (addUTCTime) @@ -33,7 +34,7 @@ import Data.Time.Clock.System (SystemTime (..), getSystemTime) import SMPClient (testStoreLogFile, testStoreMsgsDir, testStoreMsgsDir2, testStoreMsgsFile, testStoreMsgsFile2) import Simplex.Messaging.Crypto (pattern MaxLenBS) import qualified Simplex.Messaging.Crypto as C -import Simplex.Messaging.Protocol (EntityId (..), LinkId, Message (..), QueueLinkData, RecipientId, SParty (..), noMsgFlags) +import Simplex.Messaging.Protocol (EntityId (..), ErrorType, LinkId, Message (..), QueueLinkData, RecipientId, SParty (..), noMsgFlags) import Simplex.Messaging.Server (exportMessages, importMessages, printMessageStats) import Simplex.Messaging.Server.Env.STM (MsgStore (..), journalMsgStoreDepth, readWriteQueueStore) import Simplex.Messaging.Server.Expiration (ExpirationConfig (..), expireBeforeEpoch) @@ -42,6 +43,7 @@ import Simplex.Messaging.Server.MsgStore.STM import Simplex.Messaging.Server.MsgStore.Types import Simplex.Messaging.Server.QueueStore import Simplex.Messaging.Server.QueueStore.QueueInfo +import Simplex.Messaging.Server.QueueStore.Types import Simplex.Messaging.Server.StoreLog (closeStoreLog, logCreateQueue) import System.Directory (copyFile, createDirectoryIfMissing, listDirectory, removeFile, renameFile) import System.FilePath (()) @@ -50,9 +52,12 @@ import Test.Hspec hiding (fit, it) import Util #if defined(dbServerPostgres) +import Database.PostgreSQL.Simple (Only (..)) +import qualified Database.PostgreSQL.Simple as DB +import Simplex.Messaging.Agent.Store.Postgres.Common import Simplex.Messaging.Agent.Store.Shared (MigrationConfirmation (..)) import Simplex.Messaging.Server.MsgStore.Postgres -import Simplex.Messaging.Server.QueueStore.Postgres.Config +import Simplex.Messaging.Server.QueueStore.Postgres import SMPClient (postgressBracket, testServerDBConnectInfo, testStoreDBOpts) #endif @@ -70,7 +75,10 @@ msgStoreTests = do someMsgStoreTests journalMsgStoreTests around (withMsgStore testPostgresStoreConfig) $ - describe "Postgres-only message store" $ someMsgStoreTests + describe "Postgres-only message store" $ do + someMsgStoreTests + it "should correctly update message counts and canWrite flag" testUpdateMessageCounts + it "tryDelPeekMsg (ACK not from NSE) should reset message counts when queue is empty" testResetMessageCounts #endif describe "Journal message store: queue state backup expiration" $ do it "should remove old queue state backups" testRemoveQueueStateBackups @@ -310,6 +318,77 @@ testExportImportStore ms = do exportMessages False (StoreMemory stmStore) testStoreMsgsFile False (B.sort <$> B.readFile testStoreMsgsFile `shouldReturn`) =<< (B.sort <$> B.readFile (testStoreMsgsFile2 <> ".bak")) +#if defined(dbServerPostgres) +testUpdateMessageCounts :: PostgresMsgStore -> IO () +testUpdateMessageCounts ms = do + g <- C.newRandom + (rId, qr) <- testNewQueueRec g QMMessaging + runRight_ $ do + q <- ExceptT $ addQueue ms rId qr + let write s = writeMsg ms q True =<< mkMessage s + hasSize = checkQueueSize ms + q `hasSize` (0, True) + Just (Message {msgId = mId1}, True) <- write "message 1" + q `hasSize` (1, True) + Just (Message {msgId = mId2}, False) <- write "message 2" + q `hasSize` (2, True) + Just (Message {msgId = mId3}, False) <- write "message 3" + q `hasSize` (3, True) + Nothing <- write "message 4" + q `hasSize` (4, False) + Msg "message 1" <- tryPeekMsg ms q + q `hasSize` (4, False) + Msg "message 1" <- tryDelMsg ms q mId1 + q `hasSize` (3, False) + Msg "message 2" <- tryPeekMsg ms q + (Msg "message 2", Msg "message 3") <- tryDelPeekMsg ms q mId2 + q `hasSize` (2, False) + (Msg "message 3", Just MessageQuota {msgId = mId4}) <- tryDelPeekMsg ms q mId3 + q `hasSize` (1, False) + (Just MessageQuota {}, Nothing) <- tryDelPeekMsg ms q mId4 + q `hasSize` (0, True) + +checkQueueSize :: PostgresMsgStore -> PostgresQueue -> (Int64, Bool) -> ExceptT ErrorType IO () +checkQueueSize ms q (size, canWrt) = liftIO $ do + [(size', canWrt')] <- + withTransaction (dbStore $ queueStore ms) $ \db -> + DB.query db "SELECT msg_queue_size, msg_can_write FROM msg_queues WHERE recipient_id = ?" (Only (recipientId q)) + size' `shouldBe` size + canWrt' `shouldBe` canWrt + +testResetMessageCounts :: PostgresMsgStore -> IO () +testResetMessageCounts ms = do + g <- C.newRandom + (rId, qr) <- testNewQueueRec g QMMessaging + runRight_ $ do + q <- ExceptT $ addQueue ms rId qr + let write s = writeMsg ms q True =<< mkMessage s + hasSize = checkQueueSize ms + Just (Message {msgId = mId1}, True) <- write "message 1" + Just (Message {msgId = mId2}, False) <- write "message 2" + Just (Message {msgId = mId3}, False) <- write "message 3" + Nothing <- write "message 4" + q `hasSize` (4, False) + liftIO $ setIncorrectSize q (10, True) + Nothing <- write "message 5" + q `hasSize` (11, False) + (Msg "message 1", Msg "message 2") <- tryDelPeekMsg ms q mId1 + q `hasSize` (10, False) + (Msg "message 2", Msg "message 3") <- tryDelPeekMsg ms q mId2 + q `hasSize` (9, False) + (Msg "message 3", Just MessageQuota {msgId = mId4}) <- tryDelPeekMsg ms q mId3 + q `hasSize` (8, False) + (Just MessageQuota {}, Just MessageQuota {msgId = mId5}) <- tryDelPeekMsg ms q mId4 + q `hasSize` (7, False) + (Just MessageQuota {}, Nothing) <- tryDelPeekMsg ms q mId5 + q `hasSize` (0, True) -- reset + where + setIncorrectSize :: PostgresQueue -> (Int64, Bool) -> IO () + setIncorrectSize q (size, canWrt) = + void $ withTransaction (dbStore $ queueStore ms) $ \db -> + DB.execute db "UPDATE msg_queues SET msg_queue_size = ?, msg_can_write = ? WHERE recipient_id = ?" (size, canWrt, recipientId q) +#endif + testQueueState :: JournalMsgStore s -> IO () testQueueState ms = do g <- C.newRandom