diff --git a/src/Simplex/Chat.hs b/src/Simplex/Chat.hs index 3370825fb0..f8a052337b 100644 --- a/src/Simplex/Chat.hs +++ b/src/Simplex/Chat.hs @@ -4019,15 +4019,11 @@ processAgentMessageConn user@User {userId} corrId agentConnId agentMessage = do ackMsgDeliveryEvent :: Connection -> CommandId -> m () ackMsgDeliveryEvent Connection {connId} ackCmdId = - withStoreCtx' - (Just $ "createRcvMsgDeliveryEvent, connId: " <> show connId <> ", ackCmdId: " <> show ackCmdId <> ", msgDeliveryStatus: MDSRcvAcknowledged") - $ \db -> createRcvMsgDeliveryEvent db connId ackCmdId MDSRcvAcknowledged + withStore' $ \db -> updateRcvMsgDeliveryStatus db connId ackCmdId MDSRcvAcknowledged sentMsgDeliveryEvent :: Connection -> AgentMsgId -> m () sentMsgDeliveryEvent Connection {connId} msgId = - withStoreCtx - (Just $ "createSndMsgDeliveryEvent, connId: " <> show connId <> ", msgId: " <> show msgId <> ", msgDeliveryStatus: MDSSndSent") - $ \db -> createSndMsgDeliveryEvent db connId msgId MDSSndSent + withStore' $ \db -> updateSndMsgDeliveryStatus db connId msgId MDSSndSent agentErrToItemStatus :: AgentErrorType -> CIStatus 'MDSnd agentErrToItemStatus (SMP AUTH) = CISSndErrorAuth @@ -5236,18 +5232,20 @@ processAgentMessageConn user@User {userId} corrId agentConnId agentMessage = do XGrpInfo p' -> xGrpInfo gInfo author p' rcvMsg msgTs _ -> messageError $ "x.grp.msg.forward: unsupported forwarded event " <> T.pack (show $ toCMEventTag event) + -- TODO [batch send] update status of all messages in batch (getChatItemIdByAgentMsgId to return [ChatItemId]) directMsgReceived :: Contact -> Connection -> MsgMeta -> NonEmpty MsgReceipt -> m () directMsgReceived ct conn@Connection {connId} msgMeta msgRcpts = do checkIntegrityCreateItem (CDDirectRcv ct) msgMeta forM_ msgRcpts $ \MsgReceipt {agentMsgId, msgRcptStatus} -> do - withStore $ \db -> createSndMsgDeliveryEvent db connId agentMsgId $ MDSSndRcvd msgRcptStatus + withStore' $ \db -> updateSndMsgDeliveryStatus db connId agentMsgId $ MDSSndRcvd msgRcptStatus updateDirectItemStatus ct conn agentMsgId $ CISSndRcvd msgRcptStatus SSPComplete + -- TODO [batch send] same as for directMsgReceived groupMsgReceived :: GroupInfo -> GroupMember -> Connection -> MsgMeta -> NonEmpty MsgReceipt -> m () groupMsgReceived gInfo m conn@Connection {connId} msgMeta msgRcpts = do checkIntegrityCreateItem (CDGroupRcv gInfo m) msgMeta forM_ msgRcpts $ \MsgReceipt {agentMsgId, msgRcptStatus} -> do - withStore $ \db -> createSndMsgDeliveryEvent db connId agentMsgId $ MDSSndRcvd msgRcptStatus + withStore' $ \db -> updateSndMsgDeliveryStatus db connId agentMsgId $ MDSSndRcvd msgRcptStatus updateGroupItemStatus gInfo m conn agentMsgId $ CISSndRcvd msgRcptStatus SSPComplete updateDirectItemStatus :: Contact -> Connection -> AgentMsgId -> CIStatus 'MDSnd -> m () @@ -5639,7 +5637,7 @@ sendPendingGroupMessages user GroupMember {groupMemberId, localDisplayName} conn saveDirectRcvMSG :: ChatMonad m => Connection -> MsgMeta -> CommandId -> MsgBody -> m (Connection, RcvMessage) saveDirectRcvMSG conn@Connection {connId} agentMsgMeta agentAckCmdId msgBody = do -- TODO [batch send] refactor direct message processing same as groups - case (parseChatMessages msgBody) of + case parseChatMessages msgBody of [Right (ACMsg _ ChatMessage {chatVRange, msgId = sharedMsgId_, chatMsgEvent})] -> do conn' <- updatePeerChatVRange conn chatVRange let agentMsgId = fst $ recipient agentMsgMeta diff --git a/src/Simplex/Chat/Migrations/M20231206_recreate_msg_deliveries.hs b/src/Simplex/Chat/Migrations/M20231206_recreate_msg_deliveries.hs index 6477f42b2c..3e54dc1e82 100644 --- a/src/Simplex/Chat/Migrations/M20231206_recreate_msg_deliveries.hs +++ b/src/Simplex/Chat/Migrations/M20231206_recreate_msg_deliveries.hs @@ -16,7 +16,7 @@ DROP INDEX idx_msg_deliveries_agent_ack_cmd_id; CREATE TABLE new_msg_deliveries( msg_delivery_id INTEGER PRIMARY KEY, - message_id INTEGER NOT NULL REFERENCES messages ON DELETE CASCADE, -- non UNIQUE for group messages + message_id INTEGER NOT NULL REFERENCES messages ON DELETE CASCADE, -- non UNIQUE for group messages and for batched messages connection_id INTEGER NOT NULL REFERENCES connections ON DELETE CASCADE, agent_msg_id INTEGER, -- internal agent message ID (NULL while pending), non UNIQUE for batched messages agent_msg_meta TEXT, -- JSON with timestamps etc. sent in MSG, NULL for sent diff --git a/src/Simplex/Chat/Migrations/chat_schema.sql b/src/Simplex/Chat/Migrations/chat_schema.sql index 19b4d72379..3e3776d8f7 100644 --- a/src/Simplex/Chat/Migrations/chat_schema.sql +++ b/src/Simplex/Chat/Migrations/chat_schema.sql @@ -330,18 +330,6 @@ CREATE TABLE messages( author_group_member_id INTEGER REFERENCES group_members ON DELETE SET NULL, forwarded_by_group_member_id INTEGER REFERENCES group_members ON DELETE SET NULL ); -CREATE TABLE msg_deliveries( - msg_delivery_id INTEGER PRIMARY KEY, - message_id INTEGER NOT NULL REFERENCES messages ON DELETE CASCADE, -- non UNIQUE for group messages - connection_id INTEGER NOT NULL REFERENCES connections ON DELETE CASCADE, - agent_msg_id INTEGER, -- internal agent message ID(NULL while pending) - agent_msg_meta TEXT, -- JSON with timestamps etc. sent in MSG, NULL for sent - chat_ts TEXT NOT NULL DEFAULT(datetime('now')), - created_at TEXT CHECK(created_at NOT NULL), - updated_at TEXT CHECK(updated_at NOT NULL), - agent_ack_cmd_id INTEGER, -- broker_ts for received, created_at for sent - UNIQUE(connection_id, agent_msg_id) -); CREATE TABLE pending_group_messages( pending_group_message_id INTEGER PRIMARY KEY, group_member_id INTEGER NOT NULL REFERENCES group_members ON DELETE CASCADE, @@ -449,13 +437,6 @@ CREATE TABLE extra_xftp_file_descriptions( created_at TEXT NOT NULL DEFAULT(datetime('now')), updated_at TEXT NOT NULL DEFAULT(datetime('now')) ); -CREATE TABLE msg_delivery_events( - msg_delivery_event_id INTEGER PRIMARY KEY, - msg_delivery_id INTEGER NOT NULL REFERENCES msg_deliveries ON DELETE CASCADE, - delivery_status TEXT NOT NULL, - created_at TEXT NOT NULL DEFAULT(datetime('now')), - updated_at TEXT NOT NULL DEFAULT(datetime('now')) -); CREATE TABLE chat_item_versions( -- contains versions only for edited chat items, including current version chat_item_version_id INTEGER PRIMARY KEY AUTOINCREMENT, @@ -553,6 +534,18 @@ CREATE TABLE remote_controllers( dh_priv_key BLOB NOT NULL, -- last session DH key prev_dh_priv_key BLOB -- previous session DH key ); +CREATE TABLE IF NOT EXISTS "msg_deliveries"( + msg_delivery_id INTEGER PRIMARY KEY, + message_id INTEGER NOT NULL REFERENCES messages ON DELETE CASCADE, -- non UNIQUE for group messages and for batched messages + connection_id INTEGER NOT NULL REFERENCES connections ON DELETE CASCADE, + agent_msg_id INTEGER, -- internal agent message ID(NULL while pending), non UNIQUE for batched messages + agent_msg_meta TEXT, -- JSON with timestamps etc. sent in MSG, NULL for sent + chat_ts TEXT NOT NULL DEFAULT(datetime('now')), + created_at TEXT CHECK(created_at NOT NULL), + updated_at TEXT CHECK(updated_at NOT NULL), + agent_ack_cmd_id INTEGER, -- broker_ts for received, created_at for sent + delivery_status TEXT -- MsgDeliveryStatus +); CREATE INDEX contact_profiles_index ON contact_profiles( display_name, full_name @@ -584,7 +577,6 @@ CREATE UNIQUE INDEX idx_chat_items_group_shared_msg_id ON chat_items( group_member_id, shared_msg_id ); -CREATE INDEX idx_msg_deliveries_message_id ON msg_deliveries(message_id); CREATE UNIQUE INDEX idx_user_contact_links_group_id ON user_contact_links( group_id ); @@ -716,13 +708,6 @@ CREATE INDEX idx_chat_items_timed_delete_at ON chat_items( timed_delete_at ); CREATE INDEX idx_group_members_group_id ON group_members(user_id, group_id); -CREATE INDEX idx_msg_deliveries_agent_ack_cmd_id ON msg_deliveries( - connection_id, - agent_ack_cmd_id -); -CREATE INDEX msg_delivery_events_msg_delivery_id ON msg_delivery_events( - msg_delivery_id -); CREATE INDEX idx_chat_item_moderations_group_id ON chat_item_moderations( group_id ); @@ -810,3 +795,12 @@ CREATE UNIQUE INDEX idx_remote_hosts_host_fingerprint ON remote_hosts( CREATE UNIQUE INDEX idx_remote_controllers_ctrl_fingerprint ON remote_controllers( ctrl_fingerprint ); +CREATE INDEX idx_msg_deliveries_message_id ON "msg_deliveries"(message_id); +CREATE INDEX idx_msg_deliveries_agent_ack_cmd_id ON "msg_deliveries"( + connection_id, + agent_ack_cmd_id +); +CREATE INDEX idx_msg_deliveries_agent_msg_id ON "msg_deliveries"( + connection_id, + agent_msg_id +); diff --git a/src/Simplex/Chat/Store/Messages.hs b/src/Simplex/Chat/Store/Messages.hs index 102612b4ee..1c79a9f27b 100644 --- a/src/Simplex/Chat/Store/Messages.hs +++ b/src/Simplex/Chat/Store/Messages.hs @@ -23,8 +23,8 @@ module Simplex.Chat.Store.Messages createSndMsgDelivery, createNewMessageAndRcvMsgDelivery, createNewRcvMessage, - createSndMsgDeliveryEvent, - createRcvMsgDeliveryEvent, + updateSndMsgDeliveryStatus, + updateRcvMsgDeliveryStatus, createPendingGroupMessage, getPendingGroupMessages, deletePendingGroupMessage, @@ -179,11 +179,17 @@ createNewSndMessage db gVar connOrGroupId mkMessage = GroupId groupId -> (Nothing, Just groupId) createSndMsgDelivery :: DB.Connection -> SndMsgDelivery -> MessageId -> IO Int64 -createSndMsgDelivery db sndMsgDelivery messageId = do +createSndMsgDelivery db SndMsgDelivery {connId, agentMsgId} messageId = do currentTs <- getCurrentTime - msgDeliveryId <- createSndMsgDelivery_ db sndMsgDelivery messageId currentTs - createMsgDeliveryEvent_ db msgDeliveryId MDSSndAgent currentTs - pure msgDeliveryId + DB.execute + db + [sql| + INSERT INTO msg_deliveries + (message_id, connection_id, agent_msg_id, chat_ts, created_at, updated_at, delivery_status) + VALUES (?,?,?,?,?,?,?) + |] + (messageId, connId, agentMsgId, currentTs, currentTs, currentTs, MDSSndAgent) + insertedRowId db createNewMessageAndRcvMsgDelivery :: forall e. MsgEncodingI e => DB.Connection -> ConnOrGroupId -> NewMessage e -> Maybe SharedMsgId -> RcvMsgDelivery -> Maybe GroupMemberId -> ExceptT StoreError IO RcvMessage createNewMessageAndRcvMsgDelivery db connOrGroupId newMessage sharedMsgId_ RcvMsgDelivery {connId, agentMsgId, agentMsgMeta, agentAckCmdId} authorGroupMemberId_ = do @@ -192,10 +198,12 @@ createNewMessageAndRcvMsgDelivery db connOrGroupId newMessage sharedMsgId_ RcvMs currentTs <- getCurrentTime DB.execute db - "INSERT INTO msg_deliveries (message_id, connection_id, agent_msg_id, agent_msg_meta, agent_ack_cmd_id, chat_ts, created_at, updated_at) VALUES (?,?,?,?,?,?,?,?)" - (msgId, connId, agentMsgId, msgMetaJson agentMsgMeta, agentAckCmdId, snd $ broker agentMsgMeta, currentTs, currentTs) - msgDeliveryId <- insertedRowId db - createMsgDeliveryEvent_ db msgDeliveryId MDSRcvAgent currentTs + [sql| + INSERT INTO msg_deliveries + (message_id, connection_id, agent_msg_id, agent_msg_meta, agent_ack_cmd_id, chat_ts, created_at, updated_at, delivery_status) + VALUES (?,?,?,?,?,?,?,?,?) + |] + (msgId, connId, agentMsgId, msgMetaJson agentMsgMeta, agentAckCmdId, snd $ broker agentMsgMeta, currentTs, currentTs, MDSRcvAgent) pure msg createNewRcvMessage :: forall e. MsgEncodingI e => DB.Connection -> ConnOrGroupId -> NewMessage e -> Maybe SharedMsgId -> Maybe GroupMemberId -> Maybe GroupMemberId -> ExceptT StoreError IO RcvMessage @@ -234,68 +242,29 @@ createNewRcvMessage db connOrGroupId NewMessage {chatMsgEvent, msgBody} sharedMs msgId <- insertedRowId db pure RcvMessage {msgId, chatMsgEvent = ACME (encoding @e) chatMsgEvent, sharedMsgId_, msgBody, authorMember, forwardedByMember} -createSndMsgDeliveryEvent :: DB.Connection -> Int64 -> AgentMsgId -> MsgDeliveryStatus 'MDSnd -> ExceptT StoreError IO () -createSndMsgDeliveryEvent db connId agentMsgId sndMsgDeliveryStatus = do - msgDeliveryId <- getMsgDeliveryId_ db connId agentMsgId - liftIO $ do - currentTs <- getCurrentTime - createMsgDeliveryEvent_ db msgDeliveryId sndMsgDeliveryStatus currentTs - -createRcvMsgDeliveryEvent :: DB.Connection -> Int64 -> CommandId -> MsgDeliveryStatus 'MDRcv -> IO () -createRcvMsgDeliveryEvent db connId cmdId rcvMsgDeliveryStatus = do - msgDeliveryId <- getMsgDeliveryIdByCmdId_ db connId cmdId - forM_ msgDeliveryId $ \mdId -> do - currentTs <- getCurrentTime - createMsgDeliveryEvent_ db mdId rcvMsgDeliveryStatus currentTs - -createSndMsgDelivery_ :: DB.Connection -> SndMsgDelivery -> MessageId -> UTCTime -> IO Int64 -createSndMsgDelivery_ db SndMsgDelivery {connId, agentMsgId} messageId createdAt = do +updateSndMsgDeliveryStatus :: DB.Connection -> Int64 -> AgentMsgId -> MsgDeliveryStatus 'MDSnd -> IO () +updateSndMsgDeliveryStatus db connId agentMsgId sndMsgDeliveryStatus = do + currentTs <- getCurrentTime DB.execute db [sql| - INSERT INTO msg_deliveries - (message_id, connection_id, agent_msg_id, agent_msg_meta, chat_ts, created_at, updated_at) - VALUES (?,?,?,NULL,?,?,?) + UPDATE msg_deliveries + SET delivery_status = ?, updated_at = ? + WHERE connection_id = ? AND agent_msg_id = ? |] - (messageId, connId, agentMsgId, createdAt, createdAt, createdAt) - insertedRowId db + (sndMsgDeliveryStatus, currentTs, connId, agentMsgId) -createMsgDeliveryEvent_ :: DB.Connection -> Int64 -> MsgDeliveryStatus d -> UTCTime -> IO () -createMsgDeliveryEvent_ db msgDeliveryId msgDeliveryStatus createdAt = do +updateRcvMsgDeliveryStatus :: DB.Connection -> Int64 -> CommandId -> MsgDeliveryStatus 'MDRcv -> IO () +updateRcvMsgDeliveryStatus db connId cmdId rcvMsgDeliveryStatus = do + currentTs <- getCurrentTime DB.execute db [sql| - INSERT INTO msg_delivery_events - (msg_delivery_id, delivery_status, created_at, updated_at) - VALUES (?,?,?,?) + UPDATE msg_deliveries + SET delivery_status = ?, updated_at = ? + WHERE connection_id = ? AND agent_ack_cmd_id = ? |] - (msgDeliveryId, msgDeliveryStatus, createdAt, createdAt) - -getMsgDeliveryId_ :: DB.Connection -> Int64 -> AgentMsgId -> ExceptT StoreError IO Int64 -getMsgDeliveryId_ db connId agentMsgId = - ExceptT . firstRow fromOnly (SENoMsgDelivery connId agentMsgId) $ - DB.query - db - [sql| - SELECT msg_delivery_id - FROM msg_deliveries m - WHERE m.connection_id = ? AND m.agent_msg_id = ? - LIMIT 1 - |] - (connId, agentMsgId) - -getMsgDeliveryIdByCmdId_ :: DB.Connection -> Int64 -> CommandId -> IO (Maybe AgentMsgId) -getMsgDeliveryIdByCmdId_ db connId cmdId = - maybeFirstRow fromOnly $ - DB.query - db - [sql| - SELECT msg_delivery_id - FROM msg_deliveries - WHERE connection_id = ? AND agent_ack_cmd_id = ? - LIMIT 1 - |] - (connId, cmdId) + (rcvMsgDeliveryStatus, currentTs, connId, cmdId) createPendingGroupMessage :: DB.Connection -> Int64 -> MessageId -> Maybe Int64 -> IO () createPendingGroupMessage db groupMemberId messageId introId_ = do diff --git a/src/Simplex/Chat/Store/Migrations.hs b/src/Simplex/Chat/Store/Migrations.hs index 2273d17325..f143299e28 100644 --- a/src/Simplex/Chat/Store/Migrations.hs +++ b/src/Simplex/Chat/Store/Migrations.hs @@ -182,8 +182,8 @@ schemaMigrations = ("20231107_indexes", m20231107_indexes, Just down_m20231107_indexes), ("20231113_group_forward", m20231113_group_forward, Just down_m20231113_group_forward), ("20231114_remote_control", m20231114_remote_control, Just down_m20231114_remote_control), - ("20231126_remote_ctrl_address", m20231126_remote_ctrl_address, Just down_m20231126_remote_ctrl_address) - -- ("20231206_recreate_msg_deliveries", m20231206_recreate_msg_deliveries, Just down_m20231206_recreate_msg_deliveries) + ("20231126_remote_ctrl_address", m20231126_remote_ctrl_address, Just down_m20231126_remote_ctrl_address), + ("20231206_recreate_msg_deliveries", m20231206_recreate_msg_deliveries, Just down_m20231206_recreate_msg_deliveries) ] -- | The list of migrations in ascending order by date diff --git a/src/Simplex/Chat/Store/Shared.hs b/src/Simplex/Chat/Store/Shared.hs index 93c3ab197c..b8f84323dc 100644 --- a/src/Simplex/Chat/Store/Shared.hs +++ b/src/Simplex/Chat/Store/Shared.hs @@ -86,7 +86,6 @@ data StoreError | SEIntroNotFound | SEUniqueID | SEInternalError {message :: String} - | SENoMsgDelivery {connId :: Int64, agentMsgId :: AgentMsgId} | SEBadChatItem {itemId :: ChatItemId} | SEChatItemNotFound {itemId :: ChatItemId} | SEChatItemNotFoundByText {text :: Text} diff --git a/tests/SchemaDump.hs b/tests/SchemaDump.hs index f517d13df1..2f89cd850e 100644 --- a/tests/SchemaDump.hs +++ b/tests/SchemaDump.hs @@ -73,7 +73,9 @@ skipComparisonForDownMigrations = -- table and index definitions move down the file, so fields are re-created as not unique "20230914_member_probes", -- on down migration idx_connections_via_contact_uri_hash index moves down to the end of the file - "20231019_indexes" + "20231019_indexes", + -- table and indexes move down to the end of the file + "20231206_recreate_msg_deliveries" ] getSchema :: FilePath -> FilePath -> IO String