refactor code to work with updated schema

This commit is contained in:
spaced4ndy
2023-12-07 18:38:39 +04:00
parent d48ae37193
commit bf0babdb93
7 changed files with 66 additions and 104 deletions
+7 -9
View File
@@ -4019,15 +4019,11 @@ processAgentMessageConn user@User {userId} corrId agentConnId agentMessage = do
ackMsgDeliveryEvent :: Connection -> CommandId -> m ()
ackMsgDeliveryEvent Connection {connId} ackCmdId =
withStoreCtx'
(Just $ "createRcvMsgDeliveryEvent, connId: " <> show connId <> ", ackCmdId: " <> show ackCmdId <> ", msgDeliveryStatus: MDSRcvAcknowledged")
$ \db -> createRcvMsgDeliveryEvent db connId ackCmdId MDSRcvAcknowledged
withStore' $ \db -> updateRcvMsgDeliveryStatus db connId ackCmdId MDSRcvAcknowledged
sentMsgDeliveryEvent :: Connection -> AgentMsgId -> m ()
sentMsgDeliveryEvent Connection {connId} msgId =
withStoreCtx
(Just $ "createSndMsgDeliveryEvent, connId: " <> show connId <> ", msgId: " <> show msgId <> ", msgDeliveryStatus: MDSSndSent")
$ \db -> createSndMsgDeliveryEvent db connId msgId MDSSndSent
withStore' $ \db -> updateSndMsgDeliveryStatus db connId msgId MDSSndSent
agentErrToItemStatus :: AgentErrorType -> CIStatus 'MDSnd
agentErrToItemStatus (SMP AUTH) = CISSndErrorAuth
@@ -5236,18 +5232,20 @@ processAgentMessageConn user@User {userId} corrId agentConnId agentMessage = do
XGrpInfo p' -> xGrpInfo gInfo author p' rcvMsg msgTs
_ -> messageError $ "x.grp.msg.forward: unsupported forwarded event " <> T.pack (show $ toCMEventTag event)
-- TODO [batch send] update status of all messages in batch (getChatItemIdByAgentMsgId to return [ChatItemId])
directMsgReceived :: Contact -> Connection -> MsgMeta -> NonEmpty MsgReceipt -> m ()
directMsgReceived ct conn@Connection {connId} msgMeta msgRcpts = do
checkIntegrityCreateItem (CDDirectRcv ct) msgMeta
forM_ msgRcpts $ \MsgReceipt {agentMsgId, msgRcptStatus} -> do
withStore $ \db -> createSndMsgDeliveryEvent db connId agentMsgId $ MDSSndRcvd msgRcptStatus
withStore' $ \db -> updateSndMsgDeliveryStatus db connId agentMsgId $ MDSSndRcvd msgRcptStatus
updateDirectItemStatus ct conn agentMsgId $ CISSndRcvd msgRcptStatus SSPComplete
-- TODO [batch send] same as for directMsgReceived
groupMsgReceived :: GroupInfo -> GroupMember -> Connection -> MsgMeta -> NonEmpty MsgReceipt -> m ()
groupMsgReceived gInfo m conn@Connection {connId} msgMeta msgRcpts = do
checkIntegrityCreateItem (CDGroupRcv gInfo m) msgMeta
forM_ msgRcpts $ \MsgReceipt {agentMsgId, msgRcptStatus} -> do
withStore $ \db -> createSndMsgDeliveryEvent db connId agentMsgId $ MDSSndRcvd msgRcptStatus
withStore' $ \db -> updateSndMsgDeliveryStatus db connId agentMsgId $ MDSSndRcvd msgRcptStatus
updateGroupItemStatus gInfo m conn agentMsgId $ CISSndRcvd msgRcptStatus SSPComplete
updateDirectItemStatus :: Contact -> Connection -> AgentMsgId -> CIStatus 'MDSnd -> m ()
@@ -5639,7 +5637,7 @@ sendPendingGroupMessages user GroupMember {groupMemberId, localDisplayName} conn
saveDirectRcvMSG :: ChatMonad m => Connection -> MsgMeta -> CommandId -> MsgBody -> m (Connection, RcvMessage)
saveDirectRcvMSG conn@Connection {connId} agentMsgMeta agentAckCmdId msgBody = do
-- TODO [batch send] refactor direct message processing same as groups
case (parseChatMessages msgBody) of
case parseChatMessages msgBody of
[Right (ACMsg _ ChatMessage {chatVRange, msgId = sharedMsgId_, chatMsgEvent})] -> do
conn' <- updatePeerChatVRange conn chatVRange
let agentMsgId = fst $ recipient agentMsgMeta
@@ -16,7 +16,7 @@ DROP INDEX idx_msg_deliveries_agent_ack_cmd_id;
CREATE TABLE new_msg_deliveries(
msg_delivery_id INTEGER PRIMARY KEY,
message_id INTEGER NOT NULL REFERENCES messages ON DELETE CASCADE, -- non UNIQUE for group messages
message_id INTEGER NOT NULL REFERENCES messages ON DELETE CASCADE, -- non UNIQUE for group messages and for batched messages
connection_id INTEGER NOT NULL REFERENCES connections ON DELETE CASCADE,
agent_msg_id INTEGER, -- internal agent message ID (NULL while pending), non UNIQUE for batched messages
agent_msg_meta TEXT, -- JSON with timestamps etc. sent in MSG, NULL for sent
+21 -27
View File
@@ -330,18 +330,6 @@ CREATE TABLE messages(
author_group_member_id INTEGER REFERENCES group_members ON DELETE SET NULL,
forwarded_by_group_member_id INTEGER REFERENCES group_members ON DELETE SET NULL
);
CREATE TABLE msg_deliveries(
msg_delivery_id INTEGER PRIMARY KEY,
message_id INTEGER NOT NULL REFERENCES messages ON DELETE CASCADE, -- non UNIQUE for group messages
connection_id INTEGER NOT NULL REFERENCES connections ON DELETE CASCADE,
agent_msg_id INTEGER, -- internal agent message ID(NULL while pending)
agent_msg_meta TEXT, -- JSON with timestamps etc. sent in MSG, NULL for sent
chat_ts TEXT NOT NULL DEFAULT(datetime('now')),
created_at TEXT CHECK(created_at NOT NULL),
updated_at TEXT CHECK(updated_at NOT NULL),
agent_ack_cmd_id INTEGER, -- broker_ts for received, created_at for sent
UNIQUE(connection_id, agent_msg_id)
);
CREATE TABLE pending_group_messages(
pending_group_message_id INTEGER PRIMARY KEY,
group_member_id INTEGER NOT NULL REFERENCES group_members ON DELETE CASCADE,
@@ -449,13 +437,6 @@ CREATE TABLE extra_xftp_file_descriptions(
created_at TEXT NOT NULL DEFAULT(datetime('now')),
updated_at TEXT NOT NULL DEFAULT(datetime('now'))
);
CREATE TABLE msg_delivery_events(
msg_delivery_event_id INTEGER PRIMARY KEY,
msg_delivery_id INTEGER NOT NULL REFERENCES msg_deliveries ON DELETE CASCADE,
delivery_status TEXT NOT NULL,
created_at TEXT NOT NULL DEFAULT(datetime('now')),
updated_at TEXT NOT NULL DEFAULT(datetime('now'))
);
CREATE TABLE chat_item_versions(
-- contains versions only for edited chat items, including current version
chat_item_version_id INTEGER PRIMARY KEY AUTOINCREMENT,
@@ -553,6 +534,18 @@ CREATE TABLE remote_controllers(
dh_priv_key BLOB NOT NULL, -- last session DH key
prev_dh_priv_key BLOB -- previous session DH key
);
CREATE TABLE IF NOT EXISTS "msg_deliveries"(
msg_delivery_id INTEGER PRIMARY KEY,
message_id INTEGER NOT NULL REFERENCES messages ON DELETE CASCADE, -- non UNIQUE for group messages and for batched messages
connection_id INTEGER NOT NULL REFERENCES connections ON DELETE CASCADE,
agent_msg_id INTEGER, -- internal agent message ID(NULL while pending), non UNIQUE for batched messages
agent_msg_meta TEXT, -- JSON with timestamps etc. sent in MSG, NULL for sent
chat_ts TEXT NOT NULL DEFAULT(datetime('now')),
created_at TEXT CHECK(created_at NOT NULL),
updated_at TEXT CHECK(updated_at NOT NULL),
agent_ack_cmd_id INTEGER, -- broker_ts for received, created_at for sent
delivery_status TEXT -- MsgDeliveryStatus
);
CREATE INDEX contact_profiles_index ON contact_profiles(
display_name,
full_name
@@ -584,7 +577,6 @@ CREATE UNIQUE INDEX idx_chat_items_group_shared_msg_id ON chat_items(
group_member_id,
shared_msg_id
);
CREATE INDEX idx_msg_deliveries_message_id ON msg_deliveries(message_id);
CREATE UNIQUE INDEX idx_user_contact_links_group_id ON user_contact_links(
group_id
);
@@ -716,13 +708,6 @@ CREATE INDEX idx_chat_items_timed_delete_at ON chat_items(
timed_delete_at
);
CREATE INDEX idx_group_members_group_id ON group_members(user_id, group_id);
CREATE INDEX idx_msg_deliveries_agent_ack_cmd_id ON msg_deliveries(
connection_id,
agent_ack_cmd_id
);
CREATE INDEX msg_delivery_events_msg_delivery_id ON msg_delivery_events(
msg_delivery_id
);
CREATE INDEX idx_chat_item_moderations_group_id ON chat_item_moderations(
group_id
);
@@ -810,3 +795,12 @@ CREATE UNIQUE INDEX idx_remote_hosts_host_fingerprint ON remote_hosts(
CREATE UNIQUE INDEX idx_remote_controllers_ctrl_fingerprint ON remote_controllers(
ctrl_fingerprint
);
CREATE INDEX idx_msg_deliveries_message_id ON "msg_deliveries"(message_id);
CREATE INDEX idx_msg_deliveries_agent_ack_cmd_id ON "msg_deliveries"(
connection_id,
agent_ack_cmd_id
);
CREATE INDEX idx_msg_deliveries_agent_msg_id ON "msg_deliveries"(
connection_id,
agent_msg_id
);
+32 -63
View File
@@ -23,8 +23,8 @@ module Simplex.Chat.Store.Messages
createSndMsgDelivery,
createNewMessageAndRcvMsgDelivery,
createNewRcvMessage,
createSndMsgDeliveryEvent,
createRcvMsgDeliveryEvent,
updateSndMsgDeliveryStatus,
updateRcvMsgDeliveryStatus,
createPendingGroupMessage,
getPendingGroupMessages,
deletePendingGroupMessage,
@@ -179,11 +179,17 @@ createNewSndMessage db gVar connOrGroupId mkMessage =
GroupId groupId -> (Nothing, Just groupId)
createSndMsgDelivery :: DB.Connection -> SndMsgDelivery -> MessageId -> IO Int64
createSndMsgDelivery db sndMsgDelivery messageId = do
createSndMsgDelivery db SndMsgDelivery {connId, agentMsgId} messageId = do
currentTs <- getCurrentTime
msgDeliveryId <- createSndMsgDelivery_ db sndMsgDelivery messageId currentTs
createMsgDeliveryEvent_ db msgDeliveryId MDSSndAgent currentTs
pure msgDeliveryId
DB.execute
db
[sql|
INSERT INTO msg_deliveries
(message_id, connection_id, agent_msg_id, chat_ts, created_at, updated_at, delivery_status)
VALUES (?,?,?,?,?,?,?)
|]
(messageId, connId, agentMsgId, currentTs, currentTs, currentTs, MDSSndAgent)
insertedRowId db
createNewMessageAndRcvMsgDelivery :: forall e. MsgEncodingI e => DB.Connection -> ConnOrGroupId -> NewMessage e -> Maybe SharedMsgId -> RcvMsgDelivery -> Maybe GroupMemberId -> ExceptT StoreError IO RcvMessage
createNewMessageAndRcvMsgDelivery db connOrGroupId newMessage sharedMsgId_ RcvMsgDelivery {connId, agentMsgId, agentMsgMeta, agentAckCmdId} authorGroupMemberId_ = do
@@ -192,10 +198,12 @@ createNewMessageAndRcvMsgDelivery db connOrGroupId newMessage sharedMsgId_ RcvMs
currentTs <- getCurrentTime
DB.execute
db
"INSERT INTO msg_deliveries (message_id, connection_id, agent_msg_id, agent_msg_meta, agent_ack_cmd_id, chat_ts, created_at, updated_at) VALUES (?,?,?,?,?,?,?,?)"
(msgId, connId, agentMsgId, msgMetaJson agentMsgMeta, agentAckCmdId, snd $ broker agentMsgMeta, currentTs, currentTs)
msgDeliveryId <- insertedRowId db
createMsgDeliveryEvent_ db msgDeliveryId MDSRcvAgent currentTs
[sql|
INSERT INTO msg_deliveries
(message_id, connection_id, agent_msg_id, agent_msg_meta, agent_ack_cmd_id, chat_ts, created_at, updated_at, delivery_status)
VALUES (?,?,?,?,?,?,?,?,?)
|]
(msgId, connId, agentMsgId, msgMetaJson agentMsgMeta, agentAckCmdId, snd $ broker agentMsgMeta, currentTs, currentTs, MDSRcvAgent)
pure msg
createNewRcvMessage :: forall e. MsgEncodingI e => DB.Connection -> ConnOrGroupId -> NewMessage e -> Maybe SharedMsgId -> Maybe GroupMemberId -> Maybe GroupMemberId -> ExceptT StoreError IO RcvMessage
@@ -234,68 +242,29 @@ createNewRcvMessage db connOrGroupId NewMessage {chatMsgEvent, msgBody} sharedMs
msgId <- insertedRowId db
pure RcvMessage {msgId, chatMsgEvent = ACME (encoding @e) chatMsgEvent, sharedMsgId_, msgBody, authorMember, forwardedByMember}
createSndMsgDeliveryEvent :: DB.Connection -> Int64 -> AgentMsgId -> MsgDeliveryStatus 'MDSnd -> ExceptT StoreError IO ()
createSndMsgDeliveryEvent db connId agentMsgId sndMsgDeliveryStatus = do
msgDeliveryId <- getMsgDeliveryId_ db connId agentMsgId
liftIO $ do
currentTs <- getCurrentTime
createMsgDeliveryEvent_ db msgDeliveryId sndMsgDeliveryStatus currentTs
createRcvMsgDeliveryEvent :: DB.Connection -> Int64 -> CommandId -> MsgDeliveryStatus 'MDRcv -> IO ()
createRcvMsgDeliveryEvent db connId cmdId rcvMsgDeliveryStatus = do
msgDeliveryId <- getMsgDeliveryIdByCmdId_ db connId cmdId
forM_ msgDeliveryId $ \mdId -> do
currentTs <- getCurrentTime
createMsgDeliveryEvent_ db mdId rcvMsgDeliveryStatus currentTs
createSndMsgDelivery_ :: DB.Connection -> SndMsgDelivery -> MessageId -> UTCTime -> IO Int64
createSndMsgDelivery_ db SndMsgDelivery {connId, agentMsgId} messageId createdAt = do
updateSndMsgDeliveryStatus :: DB.Connection -> Int64 -> AgentMsgId -> MsgDeliveryStatus 'MDSnd -> IO ()
updateSndMsgDeliveryStatus db connId agentMsgId sndMsgDeliveryStatus = do
currentTs <- getCurrentTime
DB.execute
db
[sql|
INSERT INTO msg_deliveries
(message_id, connection_id, agent_msg_id, agent_msg_meta, chat_ts, created_at, updated_at)
VALUES (?,?,?,NULL,?,?,?)
UPDATE msg_deliveries
SET delivery_status = ?, updated_at = ?
WHERE connection_id = ? AND agent_msg_id = ?
|]
(messageId, connId, agentMsgId, createdAt, createdAt, createdAt)
insertedRowId db
(sndMsgDeliveryStatus, currentTs, connId, agentMsgId)
createMsgDeliveryEvent_ :: DB.Connection -> Int64 -> MsgDeliveryStatus d -> UTCTime -> IO ()
createMsgDeliveryEvent_ db msgDeliveryId msgDeliveryStatus createdAt = do
updateRcvMsgDeliveryStatus :: DB.Connection -> Int64 -> CommandId -> MsgDeliveryStatus 'MDRcv -> IO ()
updateRcvMsgDeliveryStatus db connId cmdId rcvMsgDeliveryStatus = do
currentTs <- getCurrentTime
DB.execute
db
[sql|
INSERT INTO msg_delivery_events
(msg_delivery_id, delivery_status, created_at, updated_at)
VALUES (?,?,?,?)
UPDATE msg_deliveries
SET delivery_status = ?, updated_at = ?
WHERE connection_id = ? AND agent_ack_cmd_id = ?
|]
(msgDeliveryId, msgDeliveryStatus, createdAt, createdAt)
getMsgDeliveryId_ :: DB.Connection -> Int64 -> AgentMsgId -> ExceptT StoreError IO Int64
getMsgDeliveryId_ db connId agentMsgId =
ExceptT . firstRow fromOnly (SENoMsgDelivery connId agentMsgId) $
DB.query
db
[sql|
SELECT msg_delivery_id
FROM msg_deliveries m
WHERE m.connection_id = ? AND m.agent_msg_id = ?
LIMIT 1
|]
(connId, agentMsgId)
getMsgDeliveryIdByCmdId_ :: DB.Connection -> Int64 -> CommandId -> IO (Maybe AgentMsgId)
getMsgDeliveryIdByCmdId_ db connId cmdId =
maybeFirstRow fromOnly $
DB.query
db
[sql|
SELECT msg_delivery_id
FROM msg_deliveries
WHERE connection_id = ? AND agent_ack_cmd_id = ?
LIMIT 1
|]
(connId, cmdId)
(rcvMsgDeliveryStatus, currentTs, connId, cmdId)
createPendingGroupMessage :: DB.Connection -> Int64 -> MessageId -> Maybe Int64 -> IO ()
createPendingGroupMessage db groupMemberId messageId introId_ = do
+2 -2
View File
@@ -182,8 +182,8 @@ schemaMigrations =
("20231107_indexes", m20231107_indexes, Just down_m20231107_indexes),
("20231113_group_forward", m20231113_group_forward, Just down_m20231113_group_forward),
("20231114_remote_control", m20231114_remote_control, Just down_m20231114_remote_control),
("20231126_remote_ctrl_address", m20231126_remote_ctrl_address, Just down_m20231126_remote_ctrl_address)
-- ("20231206_recreate_msg_deliveries", m20231206_recreate_msg_deliveries, Just down_m20231206_recreate_msg_deliveries)
("20231126_remote_ctrl_address", m20231126_remote_ctrl_address, Just down_m20231126_remote_ctrl_address),
("20231206_recreate_msg_deliveries", m20231206_recreate_msg_deliveries, Just down_m20231206_recreate_msg_deliveries)
]
-- | The list of migrations in ascending order by date
-1
View File
@@ -86,7 +86,6 @@ data StoreError
| SEIntroNotFound
| SEUniqueID
| SEInternalError {message :: String}
| SENoMsgDelivery {connId :: Int64, agentMsgId :: AgentMsgId}
| SEBadChatItem {itemId :: ChatItemId}
| SEChatItemNotFound {itemId :: ChatItemId}
| SEChatItemNotFoundByText {text :: Text}
+3 -1
View File
@@ -73,7 +73,9 @@ skipComparisonForDownMigrations =
-- table and index definitions move down the file, so fields are re-created as not unique
"20230914_member_probes",
-- on down migration idx_connections_via_contact_uri_hash index moves down to the end of the file
"20231019_indexes"
"20231019_indexes",
-- table and indexes move down to the end of the file
"20231206_recreate_msg_deliveries"
]
getSchema :: FilePath -> FilePath -> IO String