mirror of
https://github.com/simplex-chat/simplex-chat.git
synced 2026-05-24 19:35:33 +00:00
core: finalize introductions -> member relations vector stage 2 migration (drop introductions) (#6490)
* core: finalize introductions -> member relations vector stage 2 migration (drop introductions) * remove comment * skip down migration check * fix * plans * postgres schema * skip down migration comparison * do not drop group_member_intros table, rename migrations --------- Co-authored-by: Evgeny Poberezkin <evgeny@poberezkin.com>
This commit is contained in:
+2
-2
@@ -123,7 +123,7 @@ library
|
||||
Simplex.Chat.Store.Postgres.Migrations.M20251007_connections_sync
|
||||
Simplex.Chat.Store.Postgres.Migrations.M20251017_chat_tags_cascade
|
||||
Simplex.Chat.Store.Postgres.Migrations.M20251117_member_relations_vector
|
||||
Simplex.Chat.Store.Postgres.Migrations.M20251128_member_relations_vector_stage_2
|
||||
Simplex.Chat.Store.Postgres.Migrations.M20251128_migrate_member_relations
|
||||
else
|
||||
exposed-modules:
|
||||
Simplex.Chat.Archive
|
||||
@@ -270,7 +270,7 @@ library
|
||||
Simplex.Chat.Store.SQLite.Migrations.M20251007_connections_sync
|
||||
Simplex.Chat.Store.SQLite.Migrations.M20251017_chat_tags_cascade
|
||||
Simplex.Chat.Store.SQLite.Migrations.M20251117_member_relations_vector
|
||||
Simplex.Chat.Store.SQLite.Migrations.M20251128_member_relations_vector_stage_2
|
||||
Simplex.Chat.Store.SQLite.Migrations.M20251128_migrate_member_relations
|
||||
other-modules:
|
||||
Paths_simplex_chat
|
||||
hs-source-dirs:
|
||||
|
||||
@@ -167,9 +167,6 @@ startChatController mainApp enableSndFiles = do
|
||||
runExceptT (syncConnections' users) >>= \case
|
||||
Left e -> liftIO $ putStrLn $ "Error synchronizing connections: " <> show e
|
||||
Right _ -> pure ()
|
||||
runExceptT migrateMemberRelations >>= \case
|
||||
Left e -> liftIO $ putStrLn $ "Error migrating member relations: " <> show e
|
||||
Right _ -> pure ()
|
||||
restoreCalls
|
||||
s <- asks agentAsync
|
||||
readTVarIO s >>= maybe (start s users) (pure . fst)
|
||||
@@ -181,10 +178,6 @@ startChatController mainApp enableSndFiles = do
|
||||
(userDiff, connDiff) <- withAgent (\a -> syncConnections a aUserIds connIds)
|
||||
withFastStore' setConnectionsSyncTs
|
||||
toView $ CEvtConnectionsDiff (AgentUserId <$> userDiff) (AgentConnId <$> connDiff)
|
||||
migrateMemberRelations =
|
||||
when mainApp $
|
||||
whenM (withStore' hasMembersWithoutVector) $
|
||||
void $ forkIO runRelationsVectorMigration
|
||||
start s users = do
|
||||
a1 <- async agentSubscriber
|
||||
a2 <-
|
||||
@@ -4176,21 +4169,6 @@ agentSubscriber = do
|
||||
|
||||
type AgentSubResult = Map ConnId (Either AgentErrorType (Maybe ClientServiceId))
|
||||
|
||||
runRelationsVectorMigration :: CM ()
|
||||
runRelationsVectorMigration = do
|
||||
liftIO $ threadDelay' 5000000 -- 5 seconds (initial delay)
|
||||
migrateMembers
|
||||
where
|
||||
stepDelay = 1000000 -- 1 second
|
||||
migrateMembers = flip catchAllErrors eToView $ do
|
||||
lift waitChatStartedAndActivated
|
||||
gmIds <- withStore' getGMsWithoutVectorIds
|
||||
forM_ gmIds $ \gmId -> do
|
||||
lift waitChatStartedAndActivated
|
||||
withStore' (`migrateMemberRelationsVector'` gmId) `catchAllErrors` eToView
|
||||
liftIO $ threadDelay' stepDelay
|
||||
unless (null gmIds) migrateMembers
|
||||
|
||||
cleanupManager :: CM ()
|
||||
cleanupManager = do
|
||||
interval <- asks (cleanupManagerInterval . config)
|
||||
|
||||
@@ -1030,11 +1030,11 @@ introduceToModerators vr user gInfo@GroupInfo {groupId} m@GroupMember {memberRol
|
||||
else XMsgNew $ MCSimple $ extMsgContent (MCText pendingReviewMessage) Nothing
|
||||
void $ sendDirectMemberMessage mConn msg groupId
|
||||
modMs <- withStore' $ \db -> getGroupModerators db vr user gInfo
|
||||
let rcpModMs = filter shouldIntroduce modMs
|
||||
introduceMember vr user gInfo m rcpModMs (Just $ MSMember $ memberId' m)
|
||||
let rcpModMs = filter shouldIntroduceToMod modMs
|
||||
introduceMember user gInfo m rcpModMs (Just $ MSMember $ memberId' m)
|
||||
where
|
||||
shouldIntroduce :: GroupMember -> Bool
|
||||
shouldIntroduce mem =
|
||||
shouldIntroduceToMod :: GroupMember -> Bool
|
||||
shouldIntroduceToMod mem =
|
||||
memberCurrent mem
|
||||
&& groupMemberId' mem /= groupMemberId' m
|
||||
&& maxVersion (memberChatVRange mem) >= groupKnockingVersion
|
||||
@@ -1042,42 +1042,33 @@ introduceToModerators vr user gInfo@GroupInfo {groupId} m@GroupMember {memberRol
|
||||
introduceToAll :: VersionRangeChat -> User -> GroupInfo -> GroupMember -> CM ()
|
||||
introduceToAll vr user gInfo m = do
|
||||
members <- withStore' $ \db -> getGroupMembers db vr user gInfo
|
||||
vector_ <- withStore' (`getMemberRelationsVector_` m)
|
||||
let recipients = filter (shouldIntroduce vector_) members
|
||||
introduceMember vr user gInfo m recipients Nothing
|
||||
where
|
||||
shouldIntroduce :: Maybe ByteString -> GroupMember -> Bool
|
||||
shouldIntroduce vector_ m' =
|
||||
memberCurrent m'
|
||||
&& groupMemberId' m' /= groupMemberId' m
|
||||
&& maybe True (\v -> getRelation (indexInGroup m') v == MRNew) vector_
|
||||
vector <- withStore (`getMemberRelationsVector` m)
|
||||
let recipients = filter (shouldIntroduce m vector) members
|
||||
introduceMember user gInfo m recipients Nothing
|
||||
|
||||
introduceToRemaining :: VersionRangeChat -> User -> GroupInfo -> GroupMember -> CM ()
|
||||
introduceToRemaining vr user gInfo m = do
|
||||
members <- withStore' $ \db -> getGroupMembers db vr user gInfo
|
||||
vector_ <- withStore' (`getMemberRelationsVector_` m)
|
||||
recipients <- filterRecipients vector_ members
|
||||
introduceMember vr user gInfo m recipients Nothing
|
||||
where
|
||||
filterRecipients :: Maybe ByteString -> [GroupMember] -> CM [GroupMember]
|
||||
filterRecipients vector_ members = do
|
||||
newRelation <- case vector_ of
|
||||
Nothing -> do
|
||||
introducedGMIds <- S.fromList <$> withStore' (`getIntroducedGroupMemberIds` m)
|
||||
pure $ \m' -> groupMemberId' m' `S.notMember` introducedGMIds
|
||||
Just vec -> pure $ \m' -> getRelation (indexInGroup m') vec == MRNew
|
||||
pure $ filter (\m' -> groupMemberId' m' /= groupMemberId' m && memberCurrent m' && newRelation m') members
|
||||
vector <- withStore (`getMemberRelationsVector` m)
|
||||
let recipients = filter (shouldIntroduce m vector) members
|
||||
introduceMember user gInfo m recipients Nothing
|
||||
|
||||
introduceMember :: VersionRangeChat -> User -> GroupInfo -> GroupMember -> [GroupMember] -> Maybe MsgScope -> CM ()
|
||||
introduceMember _ _ _ GroupMember {activeConn = Nothing} _ _ = throwChatError $ CEInternalError "member connection not active"
|
||||
introduceMember vr user gInfo@GroupInfo {groupId} toMember@GroupMember {activeConn = Just conn} introduceToMembers msgScope = do
|
||||
shouldIntroduce :: GroupMember -> ByteString -> GroupMember -> Bool
|
||||
shouldIntroduce m vec mem =
|
||||
memberCurrent mem
|
||||
&& groupMemberId' mem /= groupMemberId' m
|
||||
&& getRelation (indexInGroup mem) vec == MRNew
|
||||
|
||||
introduceMember :: User -> GroupInfo -> GroupMember -> [GroupMember] -> Maybe MsgScope -> CM ()
|
||||
introduceMember _ _ GroupMember {activeConn = Nothing} _ _ = throwChatError $ CEInternalError "member connection not active"
|
||||
introduceMember user gInfo@GroupInfo {groupId} toMember@GroupMember {activeConn = Just conn} introduceToMembers msgScope = do
|
||||
void . sendGroupMessage' user gInfo introduceToMembers $ XGrpMemNew (memberInfo gInfo toMember) msgScope
|
||||
sendIntroductions introduceToMembers
|
||||
where
|
||||
sendIntroductions reMembers = do
|
||||
updateToMemberVector reMembers
|
||||
reMembers' <- withStore' $ \db -> createIntrosOrUpdateVectors db vr reMembers toMember
|
||||
shuffledReMembers <- liftIO $ shuffleMembers reMembers'
|
||||
updateReMembersVectors reMembers
|
||||
shuffledReMembers <- liftIO $ shuffleMembers reMembers
|
||||
if toMember `supportsVersion` batchSendVersion
|
||||
then do
|
||||
let events = map memberIntro shuffledReMembers
|
||||
@@ -1089,6 +1080,10 @@ introduceMember vr user gInfo@GroupInfo {groupId} toMember@GroupMember {activeCo
|
||||
updateToMemberVector reMembers = do
|
||||
let relations = map (\GroupMember {indexInGroup} -> (indexInGroup, (IDReferencedIntroduced, MRIntroduced))) reMembers
|
||||
withStore' $ \db -> setMemberVectorNewRelations db toMember relations
|
||||
updateReMembersVectors :: [GroupMember] -> CM ()
|
||||
updateReMembersVectors reMembers = do
|
||||
let GroupMember {indexInGroup} = toMember
|
||||
withStore' $ \db -> setMembersVectorsNewRelation db reMembers indexInGroup IDSubjectIntroduced MRIntroduced
|
||||
memberIntro :: GroupMember -> ChatMsgEvent 'Json
|
||||
memberIntro reMember =
|
||||
let mInfo = memberInfo gInfo reMember
|
||||
@@ -2020,7 +2015,7 @@ sendGroupMessages_ _user gInfo@GroupInfo {groupId} recipientMembers events = do
|
||||
pendingReq SndMessage {msgId} = (groupMemberId, msgId)
|
||||
createPendingMsg :: DB.Connection -> (GroupMemberId, MessageId) -> IO (Either ChatError ())
|
||||
createPendingMsg db (groupMemberId, msgId) =
|
||||
createPendingGroupMessage db groupMemberId msgId Nothing $> Right ()
|
||||
createPendingGroupMessage db groupMemberId msgId $> Right ()
|
||||
|
||||
data MemberSendAction = MSASend Connection | MSASendBatched Connection | MSAPending | MSAForwarded
|
||||
|
||||
@@ -2083,32 +2078,25 @@ readyMemberConn GroupMember {groupMemberId, activeConn = Just conn@Connection {c
|
||||
| otherwise = Nothing
|
||||
readyMemberConn GroupMember {activeConn = Nothing} = Nothing
|
||||
|
||||
sendGroupMemberMessage :: MsgEncodingI e => GroupInfo -> GroupMember -> ChatMsgEvent e -> Maybe GroupMemberIntro -> CM () -> CM ()
|
||||
sendGroupMemberMessage gInfo@GroupInfo {groupId} m@GroupMember {groupMemberId} chatMsgEvent intro_ postDeliver = do
|
||||
sendGroupMemberMessage :: MsgEncodingI e => GroupInfo -> GroupMember -> ChatMsgEvent e -> CM ()
|
||||
sendGroupMemberMessage gInfo@GroupInfo {groupId} m@GroupMember {groupMemberId} chatMsgEvent = do
|
||||
msg <- createSndMessage chatMsgEvent (GroupId groupId)
|
||||
messageMember msg `catchAllErrors` eToView
|
||||
where
|
||||
messageMember :: SndMessage -> CM ()
|
||||
messageMember SndMessage {msgId, msgBody} = forM_ (memberSendAction gInfo (chatMsgEvent :| []) [m] m) $ \case
|
||||
MSASend conn -> deliverMessage conn (toCMEventTag chatMsgEvent) msgBody msgId >> postDeliver
|
||||
MSASendBatched conn -> deliverMessage conn (toCMEventTag chatMsgEvent) msgBody msgId >> postDeliver
|
||||
MSAPending -> withStore' $ \db -> createPendingGroupMessage db groupMemberId msgId (introId <$> intro_)
|
||||
MSASend conn -> void $ deliverMessage conn (toCMEventTag chatMsgEvent) msgBody msgId
|
||||
MSASendBatched conn -> void $ deliverMessage conn (toCMEventTag chatMsgEvent) msgBody msgId
|
||||
MSAPending -> withStore' $ \db -> createPendingGroupMessage db groupMemberId msgId
|
||||
MSAForwarded -> pure ()
|
||||
|
||||
-- TODO ensure order - pending messages interleave with user input messages
|
||||
sendPendingGroupMessages :: User -> GroupMember -> Connection -> CM ()
|
||||
sendPendingGroupMessages user GroupMember {groupMemberId} conn = do
|
||||
pgms <- withStore' $ \db -> getPendingGroupMessages db groupMemberId
|
||||
forM_ (L.nonEmpty pgms) $ \pgms' -> do
|
||||
let msgs = L.map (\(sndMsg, _, _) -> sndMsg) pgms'
|
||||
void $ batchSendConnMessages user conn MsgFlags {notification = True} msgs
|
||||
lift . void . withStoreBatch' $ \db -> L.map (\SndMessage {msgId} -> deletePendingGroupMessage db groupMemberId msgId) msgs
|
||||
lift . void . withStoreBatch' $ \db -> L.map (\(_, tag, introId_) -> updateIntro_ db tag introId_) pgms'
|
||||
where
|
||||
updateIntro_ :: DB.Connection -> ACMEventTag -> Maybe Int64 -> IO ()
|
||||
updateIntro_ db tag introId_ = case (tag, introId_) of
|
||||
(ACMEventTag _ XGrpMemFwd_, Just introId) -> updateIntroStatus db introId GMIntroInvForwarded
|
||||
_ -> pure ()
|
||||
msgs <- withStore' $ \db -> getPendingGroupMessages db groupMemberId
|
||||
forM_ (L.nonEmpty msgs) $ \msgs' -> do
|
||||
void $ batchSendConnMessages user conn MsgFlags {notification = True} msgs'
|
||||
lift . void . withStoreBatch' $ \db -> L.map (\SndMessage {msgId} -> deletePendingGroupMessage db groupMemberId msgId) msgs'
|
||||
|
||||
saveDirectRcvMSG :: MsgEncodingI e => Connection -> MsgMeta -> MsgBody -> ChatMessage e -> CM (Connection, RcvMessage)
|
||||
saveDirectRcvMSG conn@Connection {connId} agentMsgMeta msgBody ChatMessage {chatVRange, msgId = sharedMsgId_, chatMsgEvent} = do
|
||||
|
||||
@@ -2615,14 +2615,7 @@ processAgentMessageConn vr user@User {userId} corrId agentConnId agentMessage =
|
||||
GCInviteeMember ->
|
||||
withStore' (\db -> runExceptT $ getGroupMemberByMemberId db vr user gInfo memId) >>= \case
|
||||
Left _ -> messageError "x.grp.mem.inv error: referenced member does not exist"
|
||||
Right reMember -> do
|
||||
intro_ <- withStore' $ \db -> getIntroduction db reMember m
|
||||
update intro_ GMIntroInvReceived
|
||||
sendGroupMemberMessage gInfo reMember (XGrpMemFwd (memberInfo gInfo m) introInv) intro_ $
|
||||
update intro_ GMIntroInvForwarded
|
||||
where
|
||||
update (Just GroupMemberIntro {introId}) status = withStore' $ \db -> updateIntroStatus db introId status
|
||||
update Nothing _ = pure ()
|
||||
Right reMember -> sendGroupMemberMessage gInfo reMember $ XGrpMemFwd (memberInfo gInfo m) introInv
|
||||
_ -> messageError "x.grp.mem.inv can be only sent by invitee member"
|
||||
|
||||
xGrpMemFwd :: GroupInfo -> GroupMember -> MemberInfo -> IntroInvitation -> CM ()
|
||||
@@ -2718,8 +2711,6 @@ processAgentMessageConn vr user@User {userId} corrId agentConnId agentMessage =
|
||||
xGrpMemCon :: GroupInfo -> GroupMember -> MemberId -> CM ()
|
||||
xGrpMemCon gInfo sendingMem memId = do
|
||||
refMem <- withStore $ \db -> getGroupMemberByMemberId db vr user gInfo memId
|
||||
withStore' (`migrateMemberRelationsVector` sendingMem)
|
||||
withStore' (`migrateMemberRelationsVector` refMem)
|
||||
-- Updating vectors in separate transactions to avoid deadlocks.
|
||||
withStore $ \db -> setMemberVectorRelationConnected db sendingMem refMem MRSubjectConnected
|
||||
withStore $ \db -> setMemberVectorRelationConnected db refMem sendingMem MRReferencedConnected
|
||||
@@ -2783,7 +2774,7 @@ processAgentMessageConn vr user@User {userId} corrId agentConnId agentMessage =
|
||||
let GroupMember {memberId} = m
|
||||
memberName = Just $ memberShortenedName m
|
||||
event = XGrpMsgForward memberId memberName chatMsg brokerTs
|
||||
sendGroupMemberMessage gInfo member event Nothing (pure ())
|
||||
sendGroupMemberMessage gInfo member event
|
||||
|
||||
-- TODO [channels fwd] base on differentiation between groups and channels
|
||||
isUserGrpFwdRelay :: GroupInfo -> Bool
|
||||
@@ -3228,7 +3219,7 @@ runDeliveryJobWorker a deliveryKey Worker {doWork} = do
|
||||
unless (null ms) $ deliver body ms
|
||||
where
|
||||
buildMemberList sender = do
|
||||
vec <- withStore $ \db -> migrateGetMemberRelationsVector db sender
|
||||
vec <- withStore (`getMemberRelationsVector` sender)
|
||||
-- this excludes the sender
|
||||
let introducedMemsIdxs = getRelationsIndexes MRIntroduced vec
|
||||
case jobScope of
|
||||
|
||||
@@ -99,18 +99,10 @@ module Simplex.Chat.Store.Groups
|
||||
deleteGroupMember,
|
||||
deleteGroupMemberConnection,
|
||||
updateGroupMemberRole,
|
||||
createIntroductions,
|
||||
createIntrosOrUpdateVectors,
|
||||
setMemberVectorNewRelations,
|
||||
setMembersVectorsNewRelation,
|
||||
setMemberVectorRelationConnected,
|
||||
migrateGetMemberRelationsVector,
|
||||
migrateMemberRelationsVector,
|
||||
migrateMemberRelationsVector',
|
||||
getMemberRelationsVector_,
|
||||
updateIntroStatus,
|
||||
getIntroduction,
|
||||
getIntroducedGroupMemberIds,
|
||||
getMemberRelationsVector,
|
||||
createIntroReMember,
|
||||
createIntroToMemberContact,
|
||||
getMatchingContacts,
|
||||
@@ -151,8 +143,6 @@ module Simplex.Chat.Store.Groups
|
||||
setGroupChatTTL,
|
||||
getGroupChatTTL,
|
||||
getUserGroupsToExpire,
|
||||
hasMembersWithoutVector,
|
||||
getGMsWithoutVectorIds,
|
||||
updateGroupAlias,
|
||||
)
|
||||
where
|
||||
@@ -166,7 +156,6 @@ import Data.ByteString (ByteString)
|
||||
import qualified Data.ByteString as B
|
||||
import Data.Char (toLower)
|
||||
import Data.Either (rights)
|
||||
import Data.Foldable (foldrM)
|
||||
import Data.Int (Int64)
|
||||
import Data.List (partition, sortOn)
|
||||
import Data.Maybe (catMaybes, fromMaybe, isJust, isNothing)
|
||||
@@ -1602,75 +1591,6 @@ updateGroupMemberRole :: DB.Connection -> User -> GroupMember -> GroupMemberRole
|
||||
updateGroupMemberRole db User {userId} GroupMember {groupMemberId} memRole =
|
||||
DB.execute db "UPDATE group_members SET member_role = ? WHERE user_id = ? AND group_member_id = ?" (memRole, userId, groupMemberId)
|
||||
|
||||
createIntroductions :: DB.Connection -> VersionChat -> [GroupMember] -> GroupMember -> IO [GroupMember]
|
||||
createIntroductions db chatV reMembers toMember
|
||||
| null reMembers = pure []
|
||||
| otherwise = do
|
||||
currentTs <- getCurrentTime
|
||||
catMaybes <$> mapM (createIntro_ currentTs) reMembers
|
||||
where
|
||||
createIntro_ :: UTCTime -> GroupMember -> IO (Maybe GroupMember)
|
||||
createIntro_ ts reMember =
|
||||
-- when members connect concurrently, host would try to create introductions between them in both directions;
|
||||
-- this check avoids creating second (redundant) introduction
|
||||
checkInverseIntro >>= \case
|
||||
Just _ -> pure Nothing
|
||||
Nothing -> do
|
||||
DB.execute
|
||||
db
|
||||
[sql|
|
||||
INSERT INTO group_member_intros
|
||||
(re_group_member_id, to_group_member_id, intro_status, intro_chat_protocol_version, created_at, updated_at)
|
||||
VALUES (?,?,?,?,?,?)
|
||||
|]
|
||||
(groupMemberId' reMember, groupMemberId' toMember, GMIntroPending, chatV, ts, ts)
|
||||
pure $ Just reMember
|
||||
where
|
||||
checkInverseIntro :: IO (Maybe Int64)
|
||||
checkInverseIntro =
|
||||
maybeFirstRow fromOnly $
|
||||
DB.query
|
||||
db
|
||||
"SELECT 1 FROM group_member_intros WHERE re_group_member_id = ? AND to_group_member_id = ? LIMIT 1"
|
||||
(groupMemberId' toMember, groupMemberId' reMember)
|
||||
|
||||
-- Create introductions for members without vectors and update vectors for members with vectors.
|
||||
-- Partitioning and updates happen in same transaction to avoid race conditions.
|
||||
createIntrosOrUpdateVectors :: DB.Connection -> VersionRangeChat -> [GroupMember] -> GroupMember -> IO [GroupMember]
|
||||
createIntrosOrUpdateVectors db vr reMembers toMember
|
||||
| null reMembers = pure []
|
||||
| otherwise = do
|
||||
(memsWithVec, memsWithoutVec) <- partitionByVector reMembers
|
||||
let GroupMember {indexInGroup} = toMember
|
||||
setMembersVectorsNewRelation db memsWithVec indexInGroup IDSubjectIntroduced MRIntroduced
|
||||
memsWithoutVec' <- createIntroductions db (maxVersion vr) memsWithoutVec toMember
|
||||
pure $ memsWithoutVec' <> memsWithVec
|
||||
where
|
||||
partitionByVector :: [GroupMember] -> IO ([GroupMember], [GroupMember])
|
||||
#if defined(dbPostgres)
|
||||
partitionByVector members = do
|
||||
let memberIds = map groupMemberId' members
|
||||
-- Lock rows first to ensure partitioning doesn't change in case of concurrent updates
|
||||
_ :: [Only Int] <-
|
||||
DB.query
|
||||
db
|
||||
"SELECT 1 FROM group_members WHERE group_member_id IN ? FOR UPDATE"
|
||||
(Only $ In memberIds)
|
||||
memberIdsWithVec <- S.fromList . map fromOnly <$>
|
||||
DB.query
|
||||
db
|
||||
"SELECT group_member_id FROM group_members WHERE group_member_id IN ? AND member_relations_vector IS NOT NULL"
|
||||
(Only $ In memberIds)
|
||||
pure $ partition (\m -> groupMemberId' m `S.member` memberIdsWithVec) members
|
||||
#else
|
||||
partitionByVector = foldrM checkMember ([], [])
|
||||
where
|
||||
checkMember m (withVec, withoutVec) = do
|
||||
hasVec <- isJust <$> maybeFirstRow fromOnly
|
||||
(DB.query db "SELECT 1 FROM group_members WHERE group_member_id = ? AND member_relations_vector IS NOT NULL" (Only $ groupMemberId' m) :: IO [Only Int64])
|
||||
pure $ if hasVec then (m : withVec, withoutVec) else (withVec, m : withoutVec)
|
||||
#endif
|
||||
|
||||
setMemberVectorNewRelations :: DB.Connection -> GroupMember -> [(Int64, (IntroductionDirection, MemberRelation))] -> IO ()
|
||||
setMemberVectorNewRelations db GroupMember {groupMemberId} relations = do
|
||||
v_ <- maybeFirstRow fromOnly $
|
||||
@@ -1735,100 +1655,14 @@ setMemberVectorRelationConnected db GroupMember {groupMemberId} GroupMember {ind
|
||||
|]
|
||||
(Binary v', currentTs, groupMemberId)
|
||||
|
||||
migrateGetMemberRelationsVector :: DB.Connection -> GroupMember -> ExceptT StoreError IO ByteString
|
||||
migrateGetMemberRelationsVector db m@GroupMember {groupMemberId} = do
|
||||
liftIO $ migrateMemberRelationsVector db m
|
||||
getMemberRelationsVector :: DB.Connection -> GroupMember -> ExceptT StoreError IO ByteString
|
||||
getMemberRelationsVector db GroupMember {groupMemberId} =
|
||||
ExceptT . firstRow fromOnly (SEGroupMemberNotFound groupMemberId) $
|
||||
DB.query
|
||||
db
|
||||
"SELECT member_relations_vector FROM group_members WHERE group_member_id = ?"
|
||||
(Only groupMemberId)
|
||||
|
||||
migrateMemberRelationsVector :: DB.Connection -> GroupMember -> IO ()
|
||||
migrateMemberRelationsVector db GroupMember {groupMemberId} =
|
||||
migrateMemberRelationsVector' db groupMemberId
|
||||
|
||||
migrateMemberRelationsVector' :: DB.Connection -> GroupMemberId -> IO ()
|
||||
migrateMemberRelationsVector' db groupMemberId = do
|
||||
currentTs <- liftIO getCurrentTime
|
||||
liftIO $ do
|
||||
#if defined(dbPostgres)
|
||||
-- Lock the row first to ensure computation runs only after lock is acquired
|
||||
_ :: [Only Int] <-
|
||||
DB.query
|
||||
db
|
||||
"SELECT 1 FROM group_members WHERE group_member_id = ? AND member_relations_vector IS NULL FOR UPDATE"
|
||||
(Only groupMemberId)
|
||||
#endif
|
||||
DB.execute
|
||||
db
|
||||
[sql|
|
||||
UPDATE group_members
|
||||
SET
|
||||
member_relations_vector = (
|
||||
SELECT migrate_relations_vector(idx, direction, intro_status)
|
||||
FROM (
|
||||
SELECT m.index_in_group AS idx, 0 AS direction, i.intro_status
|
||||
FROM group_member_intros i
|
||||
JOIN group_members m ON m.group_member_id = i.to_group_member_id
|
||||
WHERE i.re_group_member_id = group_members.group_member_id
|
||||
UNION ALL
|
||||
SELECT m.index_in_group AS idx, 1 AS direction, i.intro_status
|
||||
FROM group_member_intros i
|
||||
JOIN group_members m ON m.group_member_id = i.re_group_member_id
|
||||
WHERE i.to_group_member_id = group_members.group_member_id
|
||||
) AS relations
|
||||
),
|
||||
updated_at = ?
|
||||
WHERE group_member_id = ?
|
||||
AND member_relations_vector IS NULL
|
||||
|]
|
||||
(currentTs, groupMemberId)
|
||||
|
||||
getMemberRelationsVector_ :: DB.Connection -> GroupMember -> IO (Maybe ByteString)
|
||||
getMemberRelationsVector_ db GroupMember {groupMemberId} =
|
||||
maybeFirstRow fromOnly $
|
||||
DB.query
|
||||
db
|
||||
"SELECT member_relations_vector FROM group_members WHERE group_member_id = ?"
|
||||
(Only groupMemberId)
|
||||
|
||||
updateIntroStatus :: DB.Connection -> Int64 -> GroupMemberIntroStatus -> IO ()
|
||||
updateIntroStatus db introId introStatus = do
|
||||
currentTs <- getCurrentTime
|
||||
DB.execute
|
||||
db
|
||||
[sql|
|
||||
UPDATE group_member_intros
|
||||
SET intro_status = ?, updated_at = ?
|
||||
WHERE group_member_intro_id = ?
|
||||
|]
|
||||
(introStatus, currentTs, introId)
|
||||
|
||||
getIntroduction :: DB.Connection -> GroupMember -> GroupMember -> IO (Maybe GroupMemberIntro)
|
||||
getIntroduction db reMember toMember =
|
||||
maybeFirstRow toIntro $
|
||||
DB.query
|
||||
db
|
||||
[sql|
|
||||
SELECT group_member_intro_id, intro_status
|
||||
FROM group_member_intros
|
||||
WHERE re_group_member_id = ? AND to_group_member_id = ?
|
||||
|]
|
||||
(groupMemberId' reMember, groupMemberId' toMember)
|
||||
where
|
||||
toIntro :: (Int64, GroupMemberIntroStatus) -> GroupMemberIntro
|
||||
toIntro (introId, introStatus) =
|
||||
GroupMemberIntro {introId, reMember, toMember, introStatus}
|
||||
|
||||
getIntroducedGroupMemberIds :: DB.Connection -> GroupMember -> IO [GroupMemberId]
|
||||
getIntroducedGroupMemberIds db invitee =
|
||||
map fromOnly <$>
|
||||
DB.query
|
||||
db
|
||||
"SELECT re_group_member_id FROM group_member_intros WHERE to_group_member_id = ?"
|
||||
(Only $ groupMemberId' invitee)
|
||||
|
||||
createIntroReMember :: DB.Connection -> User -> GroupInfo -> GroupMember -> VersionChat -> MemberInfo -> Maybe MemberRestrictions -> (CommandId, ConnId) -> SubscriptionMode -> ExceptT StoreError IO GroupMember
|
||||
createIntroReMember
|
||||
db
|
||||
@@ -2718,25 +2552,6 @@ getUserGroupsToExpire db User {userId} globalTTL =
|
||||
where
|
||||
cond = if globalTTL == 0 then "" else " OR chat_item_ttl IS NULL"
|
||||
|
||||
hasMembersWithoutVector :: DB.Connection -> IO Bool
|
||||
hasMembersWithoutVector db =
|
||||
fromOnly . head
|
||||
<$> DB.query_
|
||||
db
|
||||
"SELECT EXISTS (SELECT 1 FROM group_members WHERE member_relations_vector IS NULL LIMIT 1)"
|
||||
|
||||
getGMsWithoutVectorIds :: DB.Connection -> IO [GroupMemberId]
|
||||
getGMsWithoutVectorIds db =
|
||||
map fromOnly <$>
|
||||
DB.query_
|
||||
db
|
||||
[sql|
|
||||
SELECT group_member_id
|
||||
FROM group_members
|
||||
WHERE member_relations_vector IS NULL
|
||||
LIMIT 1000
|
||||
|]
|
||||
|
||||
updateGroupAlias :: DB.Connection -> UserId -> GroupInfo -> LocalAlias -> IO GroupInfo
|
||||
updateGroupAlias db userId g@GroupInfo {groupId} localAlias = do
|
||||
updatedAt <- getCurrentTime
|
||||
|
||||
@@ -335,24 +335,24 @@ updateSndMsgDeliveryStatus db connId agentMsgId sndMsgDeliveryStatus = do
|
||||
|]
|
||||
(sndMsgDeliveryStatus, currentTs, connId, agentMsgId)
|
||||
|
||||
createPendingGroupMessage :: DB.Connection -> Int64 -> MessageId -> Maybe Int64 -> IO ()
|
||||
createPendingGroupMessage db groupMemberId messageId introId_ = do
|
||||
createPendingGroupMessage :: DB.Connection -> Int64 -> MessageId -> IO ()
|
||||
createPendingGroupMessage db groupMemberId messageId = do
|
||||
currentTs <- getCurrentTime
|
||||
DB.execute
|
||||
db
|
||||
[sql|
|
||||
INSERT INTO pending_group_messages
|
||||
(group_member_id, message_id, group_member_intro_id, created_at, updated_at) VALUES (?,?,?,?,?)
|
||||
(group_member_id, message_id, created_at, updated_at) VALUES (?,?,?,?)
|
||||
|]
|
||||
(groupMemberId, messageId, introId_, currentTs, currentTs)
|
||||
(groupMemberId, messageId, currentTs, currentTs)
|
||||
|
||||
getPendingGroupMessages :: DB.Connection -> Int64 -> IO [(SndMessage, ACMEventTag, Maybe Int64)]
|
||||
getPendingGroupMessages :: DB.Connection -> Int64 -> IO [SndMessage]
|
||||
getPendingGroupMessages db groupMemberId =
|
||||
map pendingGroupMessage
|
||||
<$> DB.query
|
||||
db
|
||||
[sql|
|
||||
SELECT pgm.message_id, m.shared_msg_id, m.msg_body, m.chat_msg_event, pgm.group_member_intro_id
|
||||
SELECT pgm.message_id, m.shared_msg_id, m.msg_body
|
||||
FROM pending_group_messages pgm
|
||||
JOIN messages m USING (message_id)
|
||||
WHERE pgm.group_member_id = ?
|
||||
@@ -360,8 +360,8 @@ getPendingGroupMessages db groupMemberId =
|
||||
|]
|
||||
(Only groupMemberId)
|
||||
where
|
||||
pendingGroupMessage (msgId, sharedMsgId, msgBody, cmEventTag, introId_) =
|
||||
(SndMessage {msgId, sharedMsgId, msgBody}, cmEventTag, introId_)
|
||||
pendingGroupMessage (msgId, sharedMsgId, msgBody) =
|
||||
SndMessage {msgId, sharedMsgId, msgBody}
|
||||
|
||||
deletePendingGroupMessage :: DB.Connection -> Int64 -> MessageId -> IO ()
|
||||
deletePendingGroupMessage db groupMemberId messageId =
|
||||
|
||||
@@ -22,7 +22,7 @@ import Simplex.Chat.Store.Postgres.Migrations.M20250922_remove_unused_connection
|
||||
import Simplex.Chat.Store.Postgres.Migrations.M20251007_connections_sync
|
||||
import Simplex.Chat.Store.Postgres.Migrations.M20251017_chat_tags_cascade
|
||||
import Simplex.Chat.Store.Postgres.Migrations.M20251117_member_relations_vector
|
||||
-- import Simplex.Chat.Store.Postgres.Migrations.M20251128_member_relations_vector_stage_2
|
||||
import Simplex.Chat.Store.Postgres.Migrations.M20251128_migrate_member_relations
|
||||
import Simplex.Messaging.Agent.Store.Shared (Migration (..))
|
||||
|
||||
schemaMigrations :: [(String, Text, Maybe Text)]
|
||||
@@ -44,8 +44,8 @@ schemaMigrations =
|
||||
("20250922_remove_unused_connections", m20250922_remove_unused_connections, Just down_m20250922_remove_unused_connections),
|
||||
("20251007_connections_sync", m20251007_connections_sync, Just down_m20251007_connections_sync),
|
||||
("20251017_chat_tags_cascade", m20251017_chat_tags_cascade, Just down_m20251017_chat_tags_cascade),
|
||||
("20251117_member_relations_vector", m20251117_member_relations_vector, Just down_m20251117_member_relations_vector)
|
||||
-- ("20251128_member_relations_vector_stage_2", m20251128_member_relations_vector_stage_2, Just down_m20251128_member_relations_vector_stage_2)
|
||||
("20251117_member_relations_vector", m20251117_member_relations_vector, Just down_m20251117_member_relations_vector),
|
||||
("20251128_migrate_member_relations", m20251128_migrate_member_relations, Just down_m20251128_migrate_member_relations)
|
||||
]
|
||||
|
||||
-- | The list of migrations in ascending order by date
|
||||
|
||||
@@ -7,7 +7,7 @@ import qualified Data.Text as T
|
||||
import Text.RawString.QQ (r)
|
||||
|
||||
-- This migration creates custom aggregate function migrate_relations_vector(idx, direction, intro_status).
|
||||
-- Used in live migration and stage 2 migration (M20251128_member_relations_vector_stage_2).
|
||||
-- Used in live migration and stage 2 migration (M20251128_migrate_member_relations).
|
||||
--
|
||||
-- Vector byte encoding: 4 reserved | 1 direction | 3 status
|
||||
-- Direction: 0 = IDSubjectIntroduced, 1 = IDReferencedIntroduced
|
||||
|
||||
+13
-13
@@ -1,9 +1,9 @@
|
||||
{-# LANGUAGE OverloadedStrings #-}
|
||||
{-# LANGUAGE QuasiQuotes #-}
|
||||
|
||||
module Simplex.Chat.Store.Postgres.Migrations.M20251128_member_relations_vector_stage_2 where
|
||||
module Simplex.Chat.Store.Postgres.Migrations.M20251128_migrate_member_relations where
|
||||
|
||||
import Data.Text (Text)
|
||||
import qualified Data.Text as T
|
||||
import Text.RawString.QQ (r)
|
||||
|
||||
-- Build member_relations_vector for all members that don't have it yet.
|
||||
@@ -13,11 +13,9 @@ import Text.RawString.QQ (r)
|
||||
-- - direction 0 (IDSubjectIntroduced): current member (subject) is re_group_member_id, was introduced to referenced member
|
||||
-- - direction 1 (IDReferencedIntroduced): current member (subject) is to_group_member_id, referenced member was introduced to it
|
||||
|
||||
-- TODO [relations vector] drop group_member_intros in the end of migration
|
||||
m20251128_member_relations_vector_stage_2 :: Text
|
||||
m20251128_member_relations_vector_stage_2 =
|
||||
T.pack
|
||||
[r|
|
||||
m20251128_migrate_member_relations :: Text
|
||||
m20251128_migrate_member_relations =
|
||||
[r|
|
||||
UPDATE group_members
|
||||
SET member_relations_vector = (
|
||||
SELECT migrate_relations_vector(idx, direction, intro_status)
|
||||
@@ -34,12 +32,14 @@ SET member_relations_vector = (
|
||||
) AS relations
|
||||
)
|
||||
WHERE member_relations_vector IS NULL;
|
||||
|
||||
DROP INDEX idx_pending_group_messages_group_member_intro_id;
|
||||
ALTER TABLE pending_group_messages DROP COLUMN group_member_intro_id;
|
||||
|]
|
||||
|
||||
-- TODO [relations vector] re-create group_member_intros
|
||||
down_m20251128_member_relations_vector_stage_2 :: Text
|
||||
down_m20251128_member_relations_vector_stage_2 =
|
||||
T.pack
|
||||
[r|
|
||||
|
||||
down_m20251128_migrate_member_relations :: Text
|
||||
down_m20251128_migrate_member_relations =
|
||||
[r|
|
||||
ALTER TABLE pending_group_messages ADD COLUMN group_member_intro_id BIGINT REFERENCES group_member_intros ON DELETE CASCADE;
|
||||
CREATE INDEX idx_pending_group_messages_group_member_intro_id ON pending_group_messages(group_member_intro_id);
|
||||
|]
|
||||
@@ -1033,7 +1033,6 @@ CREATE TABLE test_chat_schema.pending_group_messages (
|
||||
pending_group_message_id bigint NOT NULL,
|
||||
group_member_id bigint NOT NULL,
|
||||
message_id bigint NOT NULL,
|
||||
group_member_intro_id bigint,
|
||||
created_at timestamp with time zone DEFAULT now() NOT NULL,
|
||||
updated_at timestamp with time zone DEFAULT now() NOT NULL
|
||||
);
|
||||
@@ -2274,10 +2273,6 @@ CREATE INDEX idx_pending_group_messages_group_member_id ON test_chat_schema.pend
|
||||
|
||||
|
||||
|
||||
CREATE INDEX idx_pending_group_messages_group_member_intro_id ON test_chat_schema.pending_group_messages USING btree (group_member_intro_id);
|
||||
|
||||
|
||||
|
||||
CREATE INDEX idx_pending_group_messages_message_id ON test_chat_schema.pending_group_messages USING btree (message_id);
|
||||
|
||||
|
||||
@@ -2939,11 +2934,6 @@ ALTER TABLE ONLY test_chat_schema.pending_group_messages
|
||||
|
||||
|
||||
|
||||
ALTER TABLE ONLY test_chat_schema.pending_group_messages
|
||||
ADD CONSTRAINT pending_group_messages_group_member_intro_id_fkey FOREIGN KEY (group_member_intro_id) REFERENCES test_chat_schema.group_member_intros(group_member_intro_id) ON DELETE CASCADE;
|
||||
|
||||
|
||||
|
||||
ALTER TABLE ONLY test_chat_schema.pending_group_messages
|
||||
ADD CONSTRAINT pending_group_messages_message_id_fkey FOREIGN KEY (message_id) REFERENCES test_chat_schema.messages(message_id) ON DELETE CASCADE;
|
||||
|
||||
|
||||
@@ -145,7 +145,7 @@ import Simplex.Chat.Store.SQLite.Migrations.M20250922_remove_unused_connections
|
||||
import Simplex.Chat.Store.SQLite.Migrations.M20251007_connections_sync
|
||||
import Simplex.Chat.Store.SQLite.Migrations.M20251017_chat_tags_cascade
|
||||
import Simplex.Chat.Store.SQLite.Migrations.M20251117_member_relations_vector
|
||||
-- import Simplex.Chat.Store.SQLite.Migrations.M20251128_member_relations_vector_stage_2
|
||||
import Simplex.Chat.Store.SQLite.Migrations.M20251128_migrate_member_relations
|
||||
import Simplex.Messaging.Agent.Store.Shared (Migration (..))
|
||||
|
||||
schemaMigrations :: [(String, Query, Maybe Query)]
|
||||
@@ -290,8 +290,8 @@ schemaMigrations =
|
||||
("20250922_remove_unused_connections", m20250922_remove_unused_connections, Just down_m20250922_remove_unused_connections),
|
||||
("20251007_connections_sync", m20251007_connections_sync, Just down_m20251007_connections_sync),
|
||||
("20251017_chat_tags_cascade", m20251017_chat_tags_cascade, Just down_m20251017_chat_tags_cascade),
|
||||
("20251117_member_relations_vector", m20251117_member_relations_vector, Just down_m20251117_member_relations_vector)
|
||||
-- ("20251128_member_relations_vector_stage_2", m20251128_member_relations_vector_stage_2, Just down_m20251128_member_relations_vector_stage_2)
|
||||
("20251117_member_relations_vector", m20251117_member_relations_vector, Just down_m20251117_member_relations_vector),
|
||||
("20251128_migrate_member_relations", m20251128_migrate_member_relations, Just down_m20251128_migrate_member_relations)
|
||||
]
|
||||
|
||||
-- | The list of migrations in ascending order by date
|
||||
|
||||
@@ -15,7 +15,7 @@ import Simplex.Messaging.Agent.Store.SQLite.Util (SQLiteFunc, SQLiteFuncFinal, m
|
||||
|
||||
-- This module defines custom aggregate function migrate_relations_vector(idx, direction, intro_status).
|
||||
-- It is passed via DBOpts and registered on DB open.
|
||||
-- Used in live migration and stage 2 migration (M20251128_member_relations_vector_stage_2).
|
||||
-- Used in live migration and stage 2 migration (M20251128_migrate_member_relations).
|
||||
--
|
||||
-- Vector byte encoding: 4 reserved | 1 direction | 3 status
|
||||
-- Direction: 0 = IDSubjectIntroduced, 1 = IDReferencedIntroduced
|
||||
|
||||
+10
-8
@@ -1,6 +1,6 @@
|
||||
{-# LANGUAGE QuasiQuotes #-}
|
||||
|
||||
module Simplex.Chat.Store.SQLite.Migrations.M20251128_member_relations_vector_stage_2 where
|
||||
module Simplex.Chat.Store.SQLite.Migrations.M20251128_migrate_member_relations where
|
||||
|
||||
import Database.SQLite.Simple (Query)
|
||||
import Database.SQLite.Simple.QQ (sql)
|
||||
@@ -12,9 +12,8 @@ import Database.SQLite.Simple.QQ (sql)
|
||||
-- - direction 0 (IDSubjectIntroduced): current member (subject) is re_group_member_id, was introduced to referenced member
|
||||
-- - direction 1 (IDReferencedIntroduced): current member (subject) is to_group_member_id, referenced member was introduced to it
|
||||
|
||||
-- TODO [relations vector] drop group_member_intros in the end of migration
|
||||
m20251128_member_relations_vector_stage_2 :: Query
|
||||
m20251128_member_relations_vector_stage_2 =
|
||||
m20251128_migrate_member_relations :: Query
|
||||
m20251128_migrate_member_relations =
|
||||
[sql|
|
||||
UPDATE group_members
|
||||
SET member_relations_vector = (
|
||||
@@ -32,11 +31,14 @@ SET member_relations_vector = (
|
||||
)
|
||||
)
|
||||
WHERE member_relations_vector IS NULL;
|
||||
|
||||
DROP INDEX idx_pending_group_messages_group_member_intro_id;
|
||||
ALTER TABLE pending_group_messages DROP COLUMN group_member_intro_id;
|
||||
|]
|
||||
|
||||
-- TODO [relations vector] re-create group_member_intros
|
||||
down_m20251128_member_relations_vector_stage_2 :: Query
|
||||
down_m20251128_member_relations_vector_stage_2 =
|
||||
down_m20251128_migrate_member_relations :: Query
|
||||
down_m20251128_migrate_member_relations =
|
||||
[sql|
|
||||
|
||||
ALTER TABLE pending_group_messages ADD COLUMN group_member_intro_id INTEGER REFERENCES group_member_intros ON DELETE CASCADE;
|
||||
CREATE INDEX idx_pending_group_messages_group_member_intro_id ON pending_group_messages(group_member_intro_id);
|
||||
|]
|
||||
@@ -3427,14 +3427,6 @@ Query:
|
||||
Plan:
|
||||
SEARCH chat_item_reactions USING INDEX idx_chat_item_reactions_group (group_id=? AND shared_msg_id=?)
|
||||
|
||||
Query:
|
||||
SELECT group_member_intro_id, intro_status
|
||||
FROM group_member_intros
|
||||
WHERE re_group_member_id = ? AND to_group_member_id = ?
|
||||
|
||||
Plan:
|
||||
SEARCH group_member_intros USING INDEX sqlite_autoindex_group_member_intros_1 (re_group_member_id=? AND to_group_member_id=?)
|
||||
|
||||
Query:
|
||||
SELECT group_scope_tag, group_scope_group_member_id
|
||||
FROM chat_items
|
||||
@@ -3484,7 +3476,7 @@ SEARCH m USING INDEX idx_group_members_user_id (user_id=?)
|
||||
SEARCH p USING INTEGER PRIMARY KEY (rowid=?)
|
||||
|
||||
Query:
|
||||
SELECT pgm.message_id, m.shared_msg_id, m.msg_body, m.chat_msg_event, pgm.group_member_intro_id
|
||||
SELECT pgm.message_id, m.shared_msg_id, m.msg_body
|
||||
FROM pending_group_messages pgm
|
||||
JOIN messages m USING (message_id)
|
||||
WHERE pgm.group_member_id = ?
|
||||
@@ -3715,52 +3707,6 @@ SEARCH connections USING INDEX idx_connections_group_member_id (group_member_id=
|
||||
LIST SUBQUERY 1
|
||||
SCAN group_members USING COVERING INDEX idx_group_members_user_id_local_display_name
|
||||
|
||||
Query:
|
||||
UPDATE group_member_intros SET intro_status='fwd'
|
||||
WHERE re_group_member_id IN (SELECT group_member_id FROM group_members WHERE local_display_name = ?)
|
||||
AND to_group_member_id IN (SELECT group_member_id FROM group_members WHERE local_display_name = ?)
|
||||
|
||||
Plan:
|
||||
SEARCH group_member_intros USING INDEX sqlite_autoindex_group_member_intros_1 (re_group_member_id=? AND to_group_member_id=?)
|
||||
LIST SUBQUERY 1
|
||||
SCAN group_members USING COVERING INDEX idx_group_members_user_id_local_display_name
|
||||
LIST SUBQUERY 2
|
||||
SCAN group_members USING COVERING INDEX idx_group_members_user_id_local_display_name
|
||||
|
||||
Query:
|
||||
UPDATE group_members
|
||||
SET
|
||||
member_relations_vector = (
|
||||
SELECT migrate_relations_vector(idx, direction, intro_status)
|
||||
FROM (
|
||||
SELECT m.index_in_group AS idx, 0 AS direction, i.intro_status
|
||||
FROM group_member_intros i
|
||||
JOIN group_members m ON m.group_member_id = i.to_group_member_id
|
||||
WHERE i.re_group_member_id = group_members.group_member_id
|
||||
UNION ALL
|
||||
SELECT m.index_in_group AS idx, 1 AS direction, i.intro_status
|
||||
FROM group_member_intros i
|
||||
JOIN group_members m ON m.group_member_id = i.re_group_member_id
|
||||
WHERE i.to_group_member_id = group_members.group_member_id
|
||||
) AS relations
|
||||
),
|
||||
updated_at = ?
|
||||
WHERE group_member_id = ?
|
||||
AND member_relations_vector IS NULL
|
||||
|
||||
Plan:
|
||||
SEARCH group_members USING INTEGER PRIMARY KEY (rowid=?)
|
||||
CORRELATED SCALAR SUBQUERY 3
|
||||
CO-ROUTINE relations
|
||||
COMPOUND QUERY
|
||||
LEFT-MOST SUBQUERY
|
||||
SEARCH i USING INDEX idx_group_member_intros_re_group_member_id (re_group_member_id=?)
|
||||
SEARCH m USING INTEGER PRIMARY KEY (rowid=?)
|
||||
UNION ALL
|
||||
SEARCH i USING INDEX idx_group_member_intros_to_group_member_id (to_group_member_id=?)
|
||||
SEARCH m USING INTEGER PRIMARY KEY (rowid=?)
|
||||
SCAN relations
|
||||
|
||||
Query:
|
||||
UPDATE group_members
|
||||
SET contact_id = ?, local_display_name = ?, contact_profile_id = ?, updated_at = ?
|
||||
@@ -4432,7 +4378,7 @@ Plan:
|
||||
|
||||
Query:
|
||||
INSERT INTO pending_group_messages
|
||||
(group_member_id, message_id, group_member_intro_id, created_at, updated_at) VALUES (?,?,?,?,?)
|
||||
(group_member_id, message_id, created_at, updated_at) VALUES (?,?,?,?)
|
||||
|
||||
Plan:
|
||||
|
||||
@@ -6089,10 +6035,6 @@ Plan:
|
||||
Query: INSERT INTO xftp_file_descriptions (user_id, file_descr_text, file_descr_part_no, file_descr_complete, created_at, updated_at) VALUES (?,?,?,?,?,?)
|
||||
Plan:
|
||||
|
||||
Query: SELECT 1 FROM group_members WHERE group_member_id = ? AND member_relations_vector IS NOT NULL
|
||||
Plan:
|
||||
SEARCH group_members USING INTEGER PRIMARY KEY (rowid=?)
|
||||
|
||||
Query: SELECT 1 FROM settings WHERE user_id = ? LIMIT 1
|
||||
Plan:
|
||||
SEARCH settings USING COVERING INDEX idx_settings_user_id (user_id=?)
|
||||
@@ -6123,12 +6065,6 @@ SCAN CONSTANT ROW
|
||||
SCALAR SUBQUERY 1
|
||||
SEARCH chat_items USING COVERING INDEX idx_chat_items_contacts_created_at (user_id=? AND contact_id=?)
|
||||
|
||||
Query: SELECT EXISTS (SELECT 1 FROM group_members WHERE member_relations_vector IS NULL LIMIT 1)
|
||||
Plan:
|
||||
SCAN CONSTANT ROW
|
||||
SCALAR SUBQUERY 1
|
||||
SCAN group_members
|
||||
|
||||
Query: SELECT accepted_at FROM operator_usage_conditions WHERE server_operator_id = ? AND conditions_commit = ?
|
||||
Plan:
|
||||
SEARCH operator_usage_conditions USING INDEX idx_operator_usage_conditions_conditions_commit (conditions_commit=? AND server_operator_id=?)
|
||||
@@ -6509,14 +6445,6 @@ Query: UPDATE files SET private_snd_file_descr = ?, updated_at = ? WHERE user_id
|
||||
Plan:
|
||||
SEARCH files USING INTEGER PRIMARY KEY (rowid=?)
|
||||
|
||||
Query: UPDATE group_member_intros SET intro_status='con'
|
||||
Plan:
|
||||
SCAN group_member_intros
|
||||
|
||||
Query: UPDATE group_member_intros SET intro_status='fwd'
|
||||
Plan:
|
||||
SCAN group_member_intros
|
||||
|
||||
Query: UPDATE group_members SET contact_id = ?, updated_at = ? WHERE contact_profile_id = ?
|
||||
Plan:
|
||||
SEARCH group_members USING COVERING INDEX idx_group_members_contact_profile_id (contact_profile_id=?)
|
||||
|
||||
@@ -393,7 +393,6 @@ CREATE TABLE pending_group_messages(
|
||||
pending_group_message_id INTEGER PRIMARY KEY,
|
||||
group_member_id INTEGER NOT NULL REFERENCES group_members ON DELETE CASCADE,
|
||||
message_id INTEGER NOT NULL REFERENCES messages ON DELETE CASCADE,
|
||||
group_member_intro_id INTEGER REFERENCES group_member_intros ON DELETE CASCADE,
|
||||
created_at TEXT NOT NULL DEFAULT(datetime('now')),
|
||||
updated_at TEXT NOT NULL DEFAULT(datetime('now'))
|
||||
);
|
||||
@@ -804,9 +803,6 @@ CREATE INDEX idx_group_profiles_user_id ON group_profiles(user_id);
|
||||
CREATE INDEX idx_groups_chat_item_id ON groups(chat_item_id);
|
||||
CREATE INDEX idx_groups_group_profile_id ON groups(group_profile_id);
|
||||
CREATE INDEX idx_messages_group_id ON messages(group_id);
|
||||
CREATE INDEX idx_pending_group_messages_group_member_intro_id ON pending_group_messages(
|
||||
group_member_intro_id
|
||||
);
|
||||
CREATE INDEX idx_pending_group_messages_message_id ON pending_group_messages(
|
||||
message_id
|
||||
);
|
||||
|
||||
@@ -1747,49 +1747,6 @@ instance TextEncoding ConnType where
|
||||
ConnMember -> "member"
|
||||
ConnUserContact -> "user_contact"
|
||||
|
||||
data GroupMemberIntro = GroupMemberIntro
|
||||
{ introId :: Int64,
|
||||
reMember :: GroupMember,
|
||||
toMember :: GroupMember,
|
||||
introStatus :: GroupMemberIntroStatus
|
||||
}
|
||||
deriving (Show)
|
||||
|
||||
data GroupMemberIntroStatus
|
||||
= GMIntroPending
|
||||
| GMIntroSent
|
||||
| GMIntroInvReceived
|
||||
| GMIntroInvForwarded
|
||||
| GMIntroReConnected
|
||||
| GMIntroToConnected
|
||||
| GMIntroConnected
|
||||
deriving (Eq, Show)
|
||||
|
||||
instance FromField GroupMemberIntroStatus where fromField = fromTextField_ introStatusT
|
||||
|
||||
instance ToField GroupMemberIntroStatus where toField = toField . serializeIntroStatus
|
||||
|
||||
introStatusT :: Text -> Maybe GroupMemberIntroStatus
|
||||
introStatusT = \case
|
||||
"new" -> Just GMIntroPending
|
||||
"sent" -> Just GMIntroSent
|
||||
"rcv" -> Just GMIntroInvReceived
|
||||
"fwd" -> Just GMIntroInvForwarded
|
||||
"re-con" -> Just GMIntroReConnected
|
||||
"to-con" -> Just GMIntroToConnected
|
||||
"con" -> Just GMIntroConnected
|
||||
_ -> Nothing
|
||||
|
||||
serializeIntroStatus :: GroupMemberIntroStatus -> Text
|
||||
serializeIntroStatus = \case
|
||||
GMIntroPending -> "new"
|
||||
GMIntroSent -> "sent"
|
||||
GMIntroInvReceived -> "rcv"
|
||||
GMIntroInvForwarded -> "fwd"
|
||||
GMIntroReConnected -> "re-con"
|
||||
GMIntroToConnected -> "to-con"
|
||||
GMIntroConnected -> "con"
|
||||
|
||||
type CommandId = Int64
|
||||
|
||||
aCorrId :: CommandId -> ACorrId
|
||||
|
||||
@@ -1774,8 +1774,6 @@ testGroupDelayedModeration ps = do
|
||||
-- imitate not implemented group forwarding
|
||||
-- (real client wouldn't have forwarding code, but tests use "current code" with configured version,
|
||||
-- and forwarding client doesn't check compatibility)
|
||||
void $ withCCTransaction alice $ \db ->
|
||||
DB.execute_ db "UPDATE group_member_intros SET intro_status='con'"
|
||||
updateGroupForwardingVectors alice "bob" "cath" MRConnected
|
||||
|
||||
cath #> "#team hi" -- message is pending for bob
|
||||
@@ -1821,8 +1819,6 @@ testGroupDelayedModerationFullDelete ps = do
|
||||
-- imitate not implemented group forwarding
|
||||
-- (real client wouldn't have forwarding code, but tests use "current code" with configured version,
|
||||
-- and forwarding client doesn't check compatibility)
|
||||
void $ withCCTransaction alice $ \db ->
|
||||
DB.execute_ db "UPDATE group_member_intros SET intro_status='con'"
|
||||
updateGroupForwardingVectors alice "bob" "cath" MRConnected
|
||||
|
||||
cath #> "#team hi" -- message is pending for bob
|
||||
@@ -5049,15 +5045,6 @@ setupGroupForwarding host invitee1 invitee2 = do
|
||||
WHERE group_member_id IN (SELECT group_member_id FROM group_members WHERE local_display_name = ?)
|
||||
|]
|
||||
(Only invitee1Name)
|
||||
void $ withCCTransaction host $ \db ->
|
||||
DB.execute
|
||||
db
|
||||
[sql|
|
||||
UPDATE group_member_intros SET intro_status='fwd'
|
||||
WHERE re_group_member_id IN (SELECT group_member_id FROM group_members WHERE local_display_name = ?)
|
||||
AND to_group_member_id IN (SELECT group_member_id FROM group_members WHERE local_display_name = ?)
|
||||
|]
|
||||
(invitee1Name, invitee2Name)
|
||||
|
||||
setupGroupForwardingVectors host invitee1 invitee2
|
||||
|
||||
@@ -5110,8 +5097,6 @@ testGroupMsgForwardDeduplicate =
|
||||
createGroup3 "team" alice bob cath
|
||||
|
||||
threadDelay 1000000 -- delay so member relations don't get overwritten to connected
|
||||
void $ withCCTransaction alice $ \db ->
|
||||
DB.execute_ db "UPDATE group_member_intros SET intro_status='fwd'"
|
||||
setupGroupForwardingVectors alice bob cath
|
||||
|
||||
bob #> "#team hi there"
|
||||
|
||||
@@ -76,5 +76,7 @@ postgresSchemaDumpTest migrations testDBOpts@DBOpts {connstr, schema = testDBSch
|
||||
skipComparisonForDownMigrations :: [String]
|
||||
skipComparisonForDownMigrations =
|
||||
[ -- via_group field moves
|
||||
"20250922_remove_unused_connections"
|
||||
"20250922_remove_unused_connections",
|
||||
-- group_member_intro_id field moves
|
||||
"20251128_migrate_member_relations"
|
||||
]
|
||||
|
||||
+3
-1
@@ -132,7 +132,9 @@ skipComparisonForDownMigrations =
|
||||
-- index moves down to the end of the file
|
||||
"20250721_indexes",
|
||||
-- indexes move down to the end of the file
|
||||
"20250922_remove_unused_connections"
|
||||
"20250922_remove_unused_connections",
|
||||
-- group_member_intros table moves down to the end of the file
|
||||
"20251128_migrate_member_relations"
|
||||
]
|
||||
|
||||
getSchema :: FilePath -> FilePath -> IO String
|
||||
|
||||
Reference in New Issue
Block a user