core: fix races on member removal / delivery of their messages (#7010)

This commit is contained in:
spaced4ndy
2026-05-26 09:07:26 +00:00
committed by GitHub
parent f3abb7aa76
commit 037c05cd29
13 changed files with 173 additions and 17 deletions
+9
View File
@@ -4761,6 +4761,8 @@ cleanupManager = do
liftIO $ threadDelay' stepDelay
cleanupStaleRelayTestConns user `catchAllErrors` eToView
liftIO $ threadDelay' stepDelay
cleanupRemovedMembers user `catchAllErrors` eToView
liftIO $ threadDelay' stepDelay
cleanupTimedItems cleanupInterval user = do
ts <- liftIO getCurrentTime
let startTimedThreadCutoff = addUTCTime cleanupInterval ts
@@ -4787,6 +4789,13 @@ cleanupManager = do
forM_ staleConns $ \acId -> do
deleteAgentConnectionAsync acId
withStore' $ \db -> deleteConnectionByAgentConnId db user acId
cleanupRemovedMembers user = do
vr <- chatVersionRange
ts <- liftIO getCurrentTime
let cutoffTs = addUTCTime (-nominalDay) ts
removedMembers <- withStore' $ \db -> getRemovedMembersToCleanup db vr user cutoffTs
forM_ removedMembers $ \m ->
withStore' (\db -> deleteGroupMember db user m) `catchAllErrors` eToView
cleanupMessages = do
ts <- liftIO getCurrentTime
let cutoffTs = addUTCTime (-(30 * nominalDay)) ts
+6 -2
View File
@@ -1848,7 +1848,9 @@ deleteOrUpdateMemberRecordIO db user@User {userId} gInfo m = do
else
checkGroupMemberHasItems db user m' >>= \case
Just _ -> updateGroupMemberStatus db userId m' GSMemRemoved
Nothing -> deleteGroupMember db user m'
Nothing
| useRelays' gInfo -> updateGroupMemberRemovedAt db user m'
| otherwise -> deleteGroupMember db user m'
pure gInfo'
-- Unlike deleteOrUpdateMemberRecord, skips checkGroupMemberHasItems.
@@ -1859,7 +1861,9 @@ fullyDeleteMemberRecord user gInfo m =
fullyDeleteMemberRecordIO :: DB.Connection -> User -> GroupInfo -> GroupMember -> IO GroupInfo
fullyDeleteMemberRecordIO db user gInfo m = do
(gInfo', m') <- deleteSupportChatIfExists db user gInfo m
deleteGroupMember db user m'
if useRelays' gInfo && not (isRelay m')
then updateGroupMemberRemovedAt db user m'
else deleteGroupMember db user m'
pure gInfo'
updateMemberRecordDeleted :: User -> GroupInfo -> GroupMember -> GroupMemberStatus -> CM GroupInfo
+8 -5
View File
@@ -3415,10 +3415,13 @@ processAgentMessageConn vr user@User {userId} corrId agentConnId agentMessage =
unknownRole <- unknownMemberRole gInfo
let allowCreate = toCMEventTag chatMsgEvent /= XGrpLeave_
withStore (\db -> getCreateUnknownGMByMemberId db vr user gInfo memberId memberName unknownRole allowCreate) >>= \case
Just (author, unknown) -> do
when unknown $ toView $ CEvtUnknownMemberCreated user gInfo m author
void $ withVerifiedMsg gInfo scopeInfo author parsedMsg msgTs $
(`processForwardedMsg` Just author)
Just (author, unknown)
| memberRemoved author ->
logInfo $ "x.grp.msg.forward: ignoring content from removed member, group " <> tshow (groupId' gInfo) <> ", member " <> safeDecodeUtf8 (strEncode memberId) <> ", event " <> tshow (toCMEventTag chatMsgEvent)
| otherwise -> do
when unknown $ toView $ CEvtUnknownMemberCreated user gInfo m author
void $ withVerifiedMsg gInfo scopeInfo author parsedMsg msgTs $
(`processForwardedMsg` Just author)
Nothing -> pure ()
FwdChannel -> processForwardedMsg (VMUnsigned chatMsg) Nothing
where
@@ -3732,7 +3735,7 @@ runDeliveryJobWorker a deliveryKey Worker {doWork} = do
senders <- withStore' $ \db ->
fmap catMaybes . forM senderGMIds $ \sId ->
fmap eitherToMaybe . runExceptT $ do
sender <- getGroupMemberById db vr user sId
sender <- getNonRemovedMemberById db vr user sId
vec <- getMemberRelationsVector db sender
pure (sender, vec)
let missingSenders = length senderGMIds - length senders
+31
View File
@@ -57,6 +57,7 @@ module Simplex.Chat.Store.Groups
getMentionedGroupMember,
getMentionedMemberByMemberId,
getGroupMemberById,
getNonRemovedMemberById,
getGroupMemberByIndex,
getGroupMemberByMemberId,
getCreateUnknownGMByMemberId,
@@ -68,6 +69,7 @@ module Simplex.Chat.Store.Groups
getGroupModerators,
getGroupRelayMembers,
getGroupMembersForExpiration,
getRemovedMembersToCleanup,
deleteGroupChatItems,
deleteGroupMembers,
cleanupHostGroupLinkConn,
@@ -116,6 +118,7 @@ module Simplex.Chat.Store.Groups
updateRelayGroupKeys,
updateGroupMemberStatus,
updateGroupMemberStatusById,
updateGroupMemberRemovedAt,
updateGroupMemberAccepted,
deleteGroupMemberSupportChat,
updateGroupMembersRequireAttention,
@@ -1092,6 +1095,14 @@ getGroupMemberById db vr user@User {userId} groupMemberId =
(groupMemberQuery <> " WHERE m.group_member_id = ? AND m.user_id = ?")
(groupMemberId, userId)
getNonRemovedMemberById :: DB.Connection -> VersionRangeChat -> User -> GroupMemberId -> ExceptT StoreError IO GroupMember
getNonRemovedMemberById db vr user@User {userId} groupMemberId =
ExceptT . firstRow (toContactMember vr user) (SEGroupMemberNotFound groupMemberId) $
DB.query
db
(groupMemberQuery <> " WHERE m.group_member_id = ? AND m.user_id = ? AND m.member_status NOT IN (?,?,?,?)")
(groupMemberId, userId, GSMemRejected, GSMemRemoved, GSMemLeft, GSMemGroupDeleted)
getGroupMemberByIndex :: DB.Connection -> VersionRangeChat -> User -> GroupInfo -> Int64 -> ExceptT StoreError IO GroupMember
getGroupMemberByIndex db vr user GroupInfo {groupId} indexInGroup =
ExceptT . firstRow (toContactMember vr user) (SEGroupMemberNotFoundByIndex indexInGroup) $
@@ -1209,6 +1220,14 @@ getGroupMembersForExpiration db vr user@User {userId, userContactId} GroupInfo {
)
(groupId, userId, userContactId, GSMemRemoved, GSMemLeft, GSMemGroupDeleted, GSMemUnknown)
getRemovedMembersToCleanup :: DB.Connection -> VersionRangeChat -> User -> UTCTime -> IO [GroupMember]
getRemovedMembersToCleanup db vr user@User {userId} cutoffTs =
map (toContactMember vr user)
<$> DB.query
db
(groupMemberQuery <> " WHERE m.user_id = ? AND m.removed_at < ?")
(userId, cutoffTs)
getGroupInvitation :: DB.Connection -> VersionRangeChat -> User -> GroupId -> ExceptT StoreError IO ReceivedGroupInvitation
getGroupInvitation db vr user groupId =
getConnRec_ user >>= \case
@@ -1955,6 +1974,18 @@ updateGroupMemberStatusById db userId groupMemberId memStatus = do
|]
(memStatus, currentTs, userId, groupMemberId)
updateGroupMemberRemovedAt :: DB.Connection -> User -> GroupMember -> IO ()
updateGroupMemberRemovedAt db User {userId} GroupMember {groupMemberId} = do
currentTs <- getCurrentTime
DB.execute
db
[sql|
UPDATE group_members
SET member_status = ?, removed_at = ?, updated_at = ?
WHERE user_id = ? AND group_member_id = ?
|]
(GSMemRemoved, currentTs, currentTs, userId, groupMemberId)
updateGroupMemberAccepted :: DB.Connection -> User -> GroupMember -> GroupMemberStatus -> GroupMemberRole -> IO GroupMember
updateGroupMemberAccepted db User {userId} m@GroupMember {groupMemberId} status role = do
currentTs <- getCurrentTime
@@ -33,6 +33,7 @@ import Simplex.Chat.Store.Postgres.Migrations.M20260507_relay_inactive_at
import Simplex.Chat.Store.Postgres.Migrations.M20260514_relay_request_group_link_index
import Simplex.Chat.Store.Postgres.Migrations.M20260515_delivery_job_senders
import Simplex.Chat.Store.Postgres.Migrations.M20260520_client_services
import Simplex.Chat.Store.Postgres.Migrations.M20260525_member_removed_at
import Simplex.Messaging.Agent.Store.Shared (Migration (..))
schemaMigrations :: [(String, Text, Maybe Text)]
@@ -65,7 +66,8 @@ schemaMigrations =
("20260507_relay_inactive_at", m20260507_relay_inactive_at, Just down_m20260507_relay_inactive_at),
("20260514_relay_request_group_link_index", m20260514_relay_request_group_link_index, Just down_m20260514_relay_request_group_link_index),
("20260515_delivery_job_senders", m20260515_delivery_job_senders, Just down_m20260515_delivery_job_senders),
("20260520_client_services", m20260520_client_services, Just down_m20260520_client_services)
("20260520_client_services", m20260520_client_services, Just down_m20260520_client_services),
("20260525_member_removed_at", m20260525_member_removed_at, Just down_m20260525_member_removed_at)
]
-- | The list of migrations in ascending order by date
@@ -0,0 +1,19 @@
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE QuasiQuotes #-}
module Simplex.Chat.Store.Postgres.Migrations.M20260525_member_removed_at where
import Data.Text (Text)
import Text.RawString.QQ (r)
m20260525_member_removed_at :: Text
m20260525_member_removed_at =
[r|
ALTER TABLE group_members ADD COLUMN removed_at TIMESTAMPTZ;
|]
down_m20260525_member_removed_at :: Text
down_m20260525_member_removed_at =
[r|
ALTER TABLE group_members DROP COLUMN removed_at;
|]
@@ -818,7 +818,8 @@ CREATE TABLE test_chat_schema.group_members (
index_in_group bigint DEFAULT 0 NOT NULL,
member_relations_vector bytea,
relay_link bytea,
member_pub_key bytea
member_pub_key bytea,
removed_at timestamp with time zone
);
+3 -1
View File
@@ -156,6 +156,7 @@ import Simplex.Chat.Store.SQLite.Migrations.M20260507_relay_inactive_at
import Simplex.Chat.Store.SQLite.Migrations.M20260514_relay_request_group_link_index
import Simplex.Chat.Store.SQLite.Migrations.M20260515_delivery_job_senders
import Simplex.Chat.Store.SQLite.Migrations.M20260520_client_services
import Simplex.Chat.Store.SQLite.Migrations.M20260525_member_removed_at
import Simplex.Messaging.Agent.Store.Shared (Migration (..))
schemaMigrations :: [(String, Query, Maybe Query)]
@@ -311,7 +312,8 @@ schemaMigrations =
("20260507_relay_inactive_at", m20260507_relay_inactive_at, Just down_m20260507_relay_inactive_at),
("20260514_relay_request_group_link_index", m20260514_relay_request_group_link_index, Just down_m20260514_relay_request_group_link_index),
("20260515_delivery_job_senders", m20260515_delivery_job_senders, Just down_m20260515_delivery_job_senders),
("20260520_client_services", m20260520_client_services, Just down_m20260520_client_services)
("20260520_client_services", m20260520_client_services, Just down_m20260520_client_services),
("20260525_member_removed_at", m20260525_member_removed_at, Just down_m20260525_member_removed_at)
]
-- | The list of migrations in ascending order by date
@@ -0,0 +1,18 @@
{-# LANGUAGE QuasiQuotes #-}
module Simplex.Chat.Store.SQLite.Migrations.M20260525_member_removed_at where
import Database.SQLite.Simple (Query)
import Database.SQLite.Simple.QQ (sql)
m20260525_member_removed_at :: Query
m20260525_member_removed_at =
[sql|
ALTER TABLE group_members ADD COLUMN removed_at TEXT;
|]
down_m20260525_member_removed_at :: Query
down_m20260525_member_removed_at =
[sql|
ALTER TABLE group_members DROP COLUMN removed_at;
|]
@@ -5040,6 +5040,14 @@ Query:
Plan:
SEARCH group_members USING INTEGER PRIMARY KEY (rowid=?)
Query:
UPDATE group_members
SET member_status = ?, removed_at = ?, updated_at = ?
WHERE user_id = ? AND group_member_id = ?
Plan:
SEARCH group_members USING INTEGER PRIMARY KEY (rowid=?)
Query:
UPDATE group_members
SET member_status = ?, updated_at = ?
@@ -5564,6 +5572,25 @@ SEARCH m USING INTEGER PRIMARY KEY (rowid=?)
SEARCH p USING INTEGER PRIMARY KEY (rowid=?)
SEARCH c USING INDEX idx_connections_group_member_id (group_member_id=?) LEFT-JOIN
Query:
SELECT
m.group_member_id, m.group_id, m.index_in_group, m.member_id, m.peer_chat_min_version, m.peer_chat_max_version, m.member_role, m.member_category, m.member_status, m.show_messages, m.member_restriction,
m.invited_by, m.invited_by_group_member_id, m.local_display_name, m.contact_id, m.contact_profile_id, p.contact_profile_id, p.display_name, p.full_name, p.short_descr, p.image, p.contact_link, p.chat_peer_type, p.local_alias, p.preferences,
m.created_at, m.updated_at,
m.support_chat_ts, m.support_chat_items_unread, m.support_chat_items_member_attention, m.support_chat_items_mentions, m.support_chat_last_msg_from_member_ts, m.member_pub_key, m.relay_link,
c.connection_id, c.agent_conn_id, c.conn_level, c.via_contact, c.via_user_contact_link, c.via_group_link, c.group_link_id, c.xcontact_id, c.custom_user_profile_id,
c.conn_status, c.conn_type, c.contact_conn_initiated, c.local_alias, c.contact_id, c.group_member_id, c.user_contact_link_id,
c.created_at, c.security_code, c.security_code_verified_at, c.pq_support, c.pq_encryption, c.pq_snd_enabled, c.pq_rcv_enabled, c.auth_err_counter, c.quota_err_counter,
c.conn_chat_version, c.peer_chat_min_version, c.peer_chat_max_version
FROM group_members m
JOIN contact_profiles p ON p.contact_profile_id = COALESCE(m.member_profile_id, m.contact_profile_id)
LEFT JOIN connections c ON c.group_member_id = m.group_member_id
WHERE m.group_member_id = ? AND m.user_id = ? AND m.member_status NOT IN (?,?,?,?)
Plan:
SEARCH m USING INTEGER PRIMARY KEY (rowid=?)
SEARCH p USING INTEGER PRIMARY KEY (rowid=?)
SEARCH c USING INDEX idx_connections_group_member_id (group_member_id=?) LEFT-JOIN
Query:
SELECT
m.group_member_id, m.group_id, m.index_in_group, m.member_id, m.peer_chat_min_version, m.peer_chat_max_version, m.member_role, m.member_category, m.member_status, m.show_messages, m.member_restriction,
@@ -5621,6 +5648,25 @@ SEARCH m USING INDEX idx_group_members_group_id (user_id=? AND group_id=?)
SEARCH p USING INTEGER PRIMARY KEY (rowid=?)
SEARCH c USING INDEX idx_connections_group_member_id (group_member_id=?) LEFT-JOIN
Query:
SELECT
m.group_member_id, m.group_id, m.index_in_group, m.member_id, m.peer_chat_min_version, m.peer_chat_max_version, m.member_role, m.member_category, m.member_status, m.show_messages, m.member_restriction,
m.invited_by, m.invited_by_group_member_id, m.local_display_name, m.contact_id, m.contact_profile_id, p.contact_profile_id, p.display_name, p.full_name, p.short_descr, p.image, p.contact_link, p.chat_peer_type, p.local_alias, p.preferences,
m.created_at, m.updated_at,
m.support_chat_ts, m.support_chat_items_unread, m.support_chat_items_member_attention, m.support_chat_items_mentions, m.support_chat_last_msg_from_member_ts, m.member_pub_key, m.relay_link,
c.connection_id, c.agent_conn_id, c.conn_level, c.via_contact, c.via_user_contact_link, c.via_group_link, c.group_link_id, c.xcontact_id, c.custom_user_profile_id,
c.conn_status, c.conn_type, c.contact_conn_initiated, c.local_alias, c.contact_id, c.group_member_id, c.user_contact_link_id,
c.created_at, c.security_code, c.security_code_verified_at, c.pq_support, c.pq_encryption, c.pq_snd_enabled, c.pq_rcv_enabled, c.auth_err_counter, c.quota_err_counter,
c.conn_chat_version, c.peer_chat_min_version, c.peer_chat_max_version
FROM group_members m
JOIN contact_profiles p ON p.contact_profile_id = COALESCE(m.member_profile_id, m.contact_profile_id)
LEFT JOIN connections c ON c.group_member_id = m.group_member_id
WHERE m.user_id = ? AND m.removed_at < ?
Plan:
SEARCH m USING INDEX idx_group_members_user_id (user_id=?)
SEARCH p USING INTEGER PRIMARY KEY (rowid=?)
SEARCH c USING INDEX idx_connections_group_member_id (group_member_id=?) LEFT-JOIN
Query:
SELECT f.file_id, f.ci_file_status, f.file_path
FROM chat_items i
@@ -6898,6 +6944,10 @@ Query: SELECT member_status FROM group_members WHERE member_role = 'relay'
Plan:
SCAN group_members
Query: SELECT member_status, removed_at FROM group_members WHERE local_display_name = ?
Plan:
SCAN group_members
Query: SELECT member_xcontact_id, member_welcome_shared_msg_id FROM group_members WHERE user_id = ? AND group_id = ? AND group_member_id = ?
Plan:
SEARCH group_members USING INTEGER PRIMARY KEY (rowid=?)
@@ -222,6 +222,7 @@ CREATE TABLE group_members(
member_relations_vector BLOB,
relay_link BLOB,
member_pub_key BLOB,
removed_at TEXT,
FOREIGN KEY(user_id, local_display_name)
REFERENCES display_names(user_id, local_display_name)
ON DELETE CASCADE