core: improve database concurrency (#6541)

* core: improve database concurrency

* tests: prints on timeouts (#6546)

* update simplexmq

* fix test

* update simplexmq

---------

Co-authored-by: spaced4ndy <8711996+spaced4ndy@users.noreply.github.com>
This commit is contained in:
Evgeny
2026-01-08 13:43:37 +00:00
committed by GitHub
parent d6eebd52fc
commit 3596c37275
10 changed files with 139 additions and 116 deletions
+9 -9
View File
@@ -707,8 +707,8 @@ acceptFileReceive user@User {userId} RcvFileTransfer {fileId, xftpRcvFile, fileI
if
| inline -> do
-- accepting inline
ci <- withStore $ \db -> acceptRcvInlineFT db vr user fileId filePath
sharedMsgId <- withStore $ \db -> getSharedMsgIdByFileId db userId fileId
(ci, sharedMsgId) <- withStore $ \db ->
liftM2 (,) (acceptRcvInlineFT db vr user fileId filePath) (getSharedMsgIdByFileId db userId fileId)
send $ XFileAcptInv sharedMsgId Nothing fName
pure ci
| fileInline == Just IFMSent -> throwChatError $ CEFileAlreadyReceiving fName
@@ -925,9 +925,11 @@ acceptGroupJoinRequestAsync
incognitoProfile = do
gVar <- asks random
let initialStatus = acceptanceToStatus (memberAdmission groupProfile) gAccepted
(groupMemberId, memberId) <- withStore $ \db ->
createJoiningMember db gVar user gInfo cReqChatVRange cReqProfile cReqXContactId_ welcomeMsgId_ gLinkMemRole initialStatus
currentMemCount <- withStore' $ \db -> getGroupCurrentMembersCount db user gInfo
((groupMemberId, memberId), currentMemCount) <- withStore $ \db ->
liftM2
(,)
(createJoiningMember db gVar user gInfo cReqChatVRange cReqProfile cReqXContactId_ welcomeMsgId_ gLinkMemRole initialStatus)
(liftIO $ getGroupCurrentMembersCount db user gInfo)
let Profile {displayName} = userProfileInGroup user gInfo (fromIncognitoProfile <$> incognitoProfile)
GroupMember {memberRole = userRole, memberId = userMemberId} = membership
msg =
@@ -1041,15 +1043,13 @@ introduceToModerators vr user gInfo@GroupInfo {groupId} m@GroupMember {memberRol
introduceToAll :: VersionRangeChat -> User -> GroupInfo -> GroupMember -> CM ()
introduceToAll vr user gInfo m = do
members <- withStore' $ \db -> getGroupMembers db vr user gInfo
vector <- withStore (`getMemberRelationsVector` m)
(members, vector) <- withStore $ \db -> liftM2 (,) (liftIO $ getGroupMembers db vr user gInfo) (getMemberRelationsVector db m)
let recipients = filter (shouldIntroduce m vector) members
introduceMember user gInfo m recipients Nothing
introduceToRemaining :: VersionRangeChat -> User -> GroupInfo -> GroupMember -> CM ()
introduceToRemaining vr user gInfo m = do
members <- withStore' $ \db -> getGroupMembers db vr user gInfo
vector <- withStore (`getMemberRelationsVector` m)
(members, vector) <- withStore $ \db -> liftM2 (,) (liftIO $ getGroupMembers db vr user gInfo) (getMemberRelationsVector db m)
let recipients = filter (shouldIntroduce m vector) members
introduceMember user gInfo m recipients Nothing
+45 -36
View File
@@ -691,9 +691,10 @@ processAgentMessageConn vr user@User {userId} corrId agentConnId agentMessage =
-- TODO REMOVE LEGACY vvv
-- [async agent commands] group link auto-accept continuation on receiving INV
CFCreateConnGrpInv -> do
ct <- withStore $ \db -> getContactViaMember db vr user m
withStore' $ \db -> setNewContactMemberConnRequest db user m cReq
groupLinkId <- withStore' $ \db -> getGroupLinkId db user gInfo
(ct, groupLinkId) <- withStore $ \db -> do
ct <- getContactViaMember db vr user m
liftIO $ setNewContactMemberConnRequest db user m cReq
liftIO $ (ct,) <$> getGroupLinkId db user gInfo
sendGrpInvitation ct m groupLinkId
toView $ CEvtSentGroupInvitation user gInfo ct m
where
@@ -1814,8 +1815,9 @@ processAgentMessageConn vr user@User {userId} corrId agentConnId agentMessage =
ts@(_, ft_) = msgContentTexts mc
live = fromMaybe False live_
updateRcvChatItem = do
cci <- withStore $ \db -> getGroupChatItemBySharedMsgId db user gInfo groupMemberId sharedMsgId
scopeInfo <- withStore $ \db -> getGroupChatScopeInfoForItem db vr user gInfo (cChatItemId cci)
(cci, scopeInfo) <- withStore $ \db -> do
cci <- getGroupChatItemBySharedMsgId db user gInfo groupMemberId sharedMsgId
(cci,) <$> getGroupChatScopeInfoForItem db vr user gInfo (cChatItemId cci)
case cci of
CChatItem SMDRcv ci@ChatItem {chatDir = CIGroupRcv m', meta = CIMeta {itemLive}, content = CIRcvMsgContent oldMC} ->
if sameMemberId memberId m'
@@ -1948,8 +1950,9 @@ processAgentMessageConn vr user@User {userId} corrId agentConnId agentMessage =
xFileCancel :: Contact -> SharedMsgId -> CM ()
xFileCancel Contact {contactId} sharedMsgId = do
fileId <- withStore $ \db -> getFileIdBySharedMsgId db userId contactId sharedMsgId
ft <- withStore (\db -> getRcvFileTransfer db user fileId)
(fileId, ft) <- withStore $ \db -> do
fileId <- getFileIdBySharedMsgId db userId contactId sharedMsgId
(fileId,) <$> getRcvFileTransfer db user fileId
unless (rcvFileCompleteOrCancelled ft) $ do
cancelRcvFileTransfer user ft
ci <- withStore $ \db -> getChatItemByFileId db vr user fileId
@@ -1957,8 +1960,9 @@ processAgentMessageConn vr user@User {userId} corrId agentConnId agentMessage =
xFileAcptInv :: Contact -> SharedMsgId -> Maybe ConnReqInvitation -> String -> CM ()
xFileAcptInv ct sharedMsgId fileConnReq_ fName = do
fileId <- withStore $ \db -> getDirectFileIdBySharedMsgId db user ct sharedMsgId
(AChatItem _ _ _ ci) <- withStore $ \db -> getChatItemByFileId db vr user fileId
(fileId, AChatItem _ _ _ ci) <- withStore $ \db -> do
fileId <- getDirectFileIdBySharedMsgId db user ct sharedMsgId
(fileId,) <$> getChatItemByFileId db vr user fileId
assertSMPAcceptNotProhibited ci
ft@FileTransferMeta {fileName, fileSize, fileInline, cancelled} <- withStore (\db -> getFileTransferMeta db user fileId)
-- [async agent commands] no continuation needed, but command should be asynchronous for stability
@@ -2033,8 +2037,7 @@ processAgentMessageConn vr user@User {userId} corrId agentConnId agentMessage =
xFileCancelGroup g@GroupInfo {groupId} GroupMember {memberId} sharedMsgId = do
(fileId, aci) <- withStore $ \db -> do
fileId <- getGroupFileIdBySharedMsgId db userId groupId sharedMsgId
aci <- getChatItemByFileId db vr user fileId
pure (fileId, aci)
(fileId,) <$> getChatItemByFileId db vr user fileId
case aci of
AChatItem SCTGroup SMDRcv (GroupChat _g scopeInfo) ChatItem {chatDir = CIGroupRcv m} -> do
if sameMemberId memberId m
@@ -2051,8 +2054,9 @@ processAgentMessageConn vr user@User {userId} corrId agentConnId agentMessage =
xFileAcptInvGroup :: GroupInfo -> GroupMember -> SharedMsgId -> Maybe ConnReqInvitation -> String -> CM ()
xFileAcptInvGroup GroupInfo {groupId} m@GroupMember {activeConn} sharedMsgId fileConnReq_ fName = do
fileId <- withStore $ \db -> getGroupFileIdBySharedMsgId db userId groupId sharedMsgId
(AChatItem _ _ _ ci) <- withStore $ \db -> getChatItemByFileId db vr user fileId
(fileId, AChatItem _ _ _ ci) <- withStore $ \db -> do
fileId <- getGroupFileIdBySharedMsgId db userId groupId sharedMsgId
(fileId,) <$> getChatItemByFileId db vr user fileId
assertSMPAcceptNotProhibited ci
-- TODO check that it's not already accepted
ft@FileTransferMeta {fileName, fileSize, fileInline, cancelled} <- withStore (\db -> getFileTransferMeta db user fileId)
@@ -2123,8 +2127,9 @@ processAgentMessageConn vr user@User {userId} corrId agentConnId agentMessage =
xDirectDel c msg msgMeta =
if directOrUsed c
then do
ct' <- withStore' $ \db -> updateContactStatus db user c CSDeleted
contactConns <- withStore' $ \db -> getContactConnections db vr userId ct'
(ct', contactConns) <- withStore' $ \db -> do
ct' <- updateContactStatus db user c CSDeleted
(ct',) <$> getContactConnections db vr userId ct'
deleteAgentConnectionsAsync $ map aConnId contactConns
forM_ contactConns $ \conn -> withStore' $ \db -> updateConnectionStatus db conn ConnDeleted
activeConn' <- forM (contactConn ct') $ \conn -> pure conn {connStatus = ConnDeleted}
@@ -2496,15 +2501,16 @@ processAgentMessageConn vr user@User {userId} corrId agentConnId agentMessage =
associateMemberWithContact :: Contact -> GroupMember -> CM Contact
associateMemberWithContact c1 m2@GroupMember {groupId} = do
withStore' $ \db -> associateMemberWithContactRecord db user c1 m2
g <- withStore $ \db -> getGroupInfo db vr user groupId
g <- withStore $ \db -> do
liftIO $ associateMemberWithContactRecord db user c1 m2
getGroupInfo db vr user groupId
toView $ CEvtContactAndMemberAssociated user c1 g m2 c1
pure c1
associateContactWithMember :: GroupMember -> Contact -> CM Contact
associateContactWithMember m1@GroupMember {groupId} c2 = do
c2' <- withStore $ \db -> associateContactWithMemberRecord db vr user m1 c2
g <- withStore $ \db -> getGroupInfo db vr user groupId
(c2', g) <- withStore $ \db ->
liftM2 (,) (associateContactWithMemberRecord db vr user m1 c2) (getGroupInfo db vr user groupId)
toView $ CEvtContactAndMemberAssociated user c2 g m1 c2'
pure c2'
@@ -2622,19 +2628,21 @@ processAgentMessageConn vr user@User {userId} corrId agentConnId agentMessage =
xGrpMemFwd gInfo@GroupInfo {membership, chatSettings} m memInfo@(MemberInfo memId memRole memChatVRange _) IntroInvitation {groupConnReq, directConnReq} = do
let GroupMember {memberId = membershipMemId} = membership
checkHostRole m memRole
toMember <-
withStore' (\db -> runExceptT $ getGroupMemberByMemberId db vr user gInfo memId) >>= \case
toMember <- withStore $ \db -> do
toMember <- getGroupMemberByMemberId db vr user gInfo memId
-- TODO if the missed messages are correctly sent as soon as there is connection before anything else is sent
-- the situation when member does not exist is an error
-- member receiving x.grp.mem.fwd should have also received x.grp.mem.new prior to that.
-- For now, this branch compensates for the lack of delayed message delivery.
Left _ -> withStore $ \db -> createNewGroupMember db user gInfo m memInfo GCPostMember GSMemAnnounced
Right m' -> pure m'
-- TODO [knocking] separate pending statuses from GroupMemberStatus?
-- TODO add GSMemIntroInvitedPending, GSMemConnectedPending, etc.?
-- TODO keep as is? (GSMemIntroInvited has no purpose)
let newMemberStatus = if memberPending toMember then memberStatus toMember else GSMemIntroInvited
withStore' $ \db -> updateGroupMemberStatus db userId toMember newMemberStatus
`catchError` \case
SEGroupMemberNotFoundByMemberId _ -> createNewGroupMember db user gInfo m memInfo GCPostMember GSMemAnnounced
e -> throwError e
-- TODO [knocking] separate pending statuses from GroupMemberStatus?
-- TODO add GSMemIntroInvitedPending, GSMemConnectedPending, etc.?
-- TODO keep as is? (GSMemIntroInvited has no purpose)
let newMemberStatus = if memberPending toMember then memberStatus toMember else GSMemIntroInvited
liftIO $ updateGroupMemberStatus db userId toMember newMemberStatus
pure toMember
subMode <- chatReadVar subscriptionMode
-- [incognito] send membership incognito profile, create direct connection as incognito
let membershipProfile = redactedMemberProfile allowSimplexLinks $ fromLocalProfile $ memberProfile membership
@@ -3021,14 +3029,15 @@ processAgentMessageConn vr user@User {userId} corrId agentConnId agentMessage =
updateGroupItemsStatus :: GroupInfo -> GroupMember -> Connection -> AgentMsgId -> GroupSndStatus -> Maybe Bool -> CM ()
updateGroupItemsStatus gInfo@GroupInfo {groupId} GroupMember {groupMemberId} Connection {connId} msgId newMemStatus viaProxy_ = do
items <- withStore' (\db -> getGroupChatItemsByAgentMsgId db user groupId connId msgId)
cis <- catMaybes <$> withStore (\db -> mapM (updateItem db) items)
-- SENT and RCVD events are received for messages that may be batched in single scope,
-- so we can look up scope of first item
scopeInfo <- case cis of
(ci : _) -> withStore $ \db -> getGroupChatScopeInfoForItem db vr user gInfo (chatItemId' ci)
_ -> pure Nothing
let acis = map (gItem scopeInfo) cis
acis <- withStore $ \db -> do
items <- liftIO $ getGroupChatItemsByAgentMsgId db user groupId connId msgId
cis <- catMaybes <$> mapM (updateItem db) items
-- SENT and RCVD events are received for messages that may be batched in single scope,
-- so we can look up scope of first item
scopeInfo <- case cis of
(ci : _) -> getGroupChatScopeInfoForItem db vr user gInfo (chatItemId' ci)
_ -> pure Nothing
pure $ map (gItem scopeInfo) cis
unless (null acis) $ toView $ CEvtChatItemsStatusesUpdated user acis
where
gItem scopeInfo ci = AChatItem SCTGroup SMDSnd (GroupChat gInfo scopeInfo) ci
+6 -6
View File
@@ -1596,11 +1596,11 @@ setMemberVectorNewRelations db GroupMember {groupMemberId} relations = do
v_ <- maybeFirstRow fromOnly $
DB.query
db
( "SELECT member_relations_vector FROM group_members WHERE group_member_id = ?"
#if defined(dbPostgres)
"SELECT member_relations_vector FROM group_members WHERE group_member_id = ? FOR UPDATE"
#else
"SELECT member_relations_vector FROM group_members WHERE group_member_id = ?"
<> " FOR UPDATE"
#endif
)
(Only groupMemberId)
let v' = setNewRelations relations $ fromMaybe B.empty v_
currentTs <- getCurrentTime
@@ -1638,11 +1638,11 @@ setMemberVectorRelationConnected db GroupMember {groupMemberId} GroupMember {ind
firstRow fromOnly (SEMemberRelationsVectorNotFound groupMemberId) $
DB.query
db
( "SELECT member_relations_vector FROM group_members WHERE group_member_id = ? AND member_relations_vector IS NOT NULL"
#if defined(dbPostgres)
"SELECT member_relations_vector FROM group_members WHERE group_member_id = ? AND member_relations_vector IS NOT NULL FOR UPDATE"
#else
"SELECT member_relations_vector FROM group_members WHERE group_member_id = ? AND member_relations_vector IS NOT NULL"
<> " FOR UPDATE"
#endif
)
(Only groupMemberId)
let v' = setRelationConnected indexInGroup newStatus v
currentTs <- liftIO getCurrentTime
+22 -24
View File
@@ -52,7 +52,6 @@ module Simplex.Chat.Store.Messages
getDirectChatItemLast,
getAllChatItems,
getAChatItem,
getAChatItemBySharedMsgId,
updateDirectChatItem,
updateDirectChatItem',
addInitialAndNewCIVersions,
@@ -1235,13 +1234,17 @@ getDirectChatItemLast db user@User {userId} contactId = do
ExceptT . firstRow fromOnly (SEChatItemNotFoundByContactId contactId) $
DB.query
db
[sql|
SELECT chat_item_id
FROM chat_items
WHERE user_id = ? AND contact_id = ?
ORDER BY created_at DESC, chat_item_id DESC
LIMIT 1
|]
( [sql|
SELECT chat_item_id
FROM chat_items
WHERE user_id = ? AND contact_id = ?
ORDER BY created_at DESC, chat_item_id DESC
LIMIT 1
|]
#if defined(dbPostgres)
<> " FOR UPDATE"
#endif
)
(userId, contactId)
getDirectChatItem db user contactId chatItemId
@@ -1560,13 +1563,17 @@ getGroupMemberChatItemLast db user@User {userId} groupId groupMemberId = do
ExceptT . firstRow fromOnly (SEChatItemNotFoundByGroupId groupId) $
DB.query
db
[sql|
SELECT chat_item_id
FROM chat_items
WHERE user_id = ? AND group_id = ? AND group_member_id = ?
ORDER BY item_ts DESC, chat_item_id DESC
LIMIT 1
|]
( [sql|
SELECT chat_item_id
FROM chat_items
WHERE user_id = ? AND group_id = ? AND group_member_id = ?
ORDER BY item_ts DESC, chat_item_id DESC
LIMIT 1
|]
#if defined(dbPostgres)
<> " FOR UPDATE"
#endif
)
(userId, groupId, groupMemberId)
getGroupChatItem db user groupId chatItemId
@@ -3243,15 +3250,6 @@ getAChatItem db vr user (ChatRef cType chatId scope) itemId = do
_ -> throwError $ SEChatItemNotFound itemId
liftIO $ getACIReactions db aci
getAChatItemBySharedMsgId :: ChatTypeQuotable c => DB.Connection -> User -> ChatDirection c 'MDRcv -> SharedMsgId -> ExceptT StoreError IO AChatItem
getAChatItemBySharedMsgId db user cd sharedMsgId = case cd of
CDDirectRcv ct@Contact {contactId} -> do
(CChatItem msgDir ci) <- getDirectChatItemBySharedMsgId db user contactId sharedMsgId
pure $ AChatItem SCTDirect msgDir (DirectChat ct) ci
CDGroupRcv g scopeInfo GroupMember {groupMemberId} -> do
(CChatItem msgDir ci) <- getGroupChatItemBySharedMsgId db user g groupMemberId sharedMsgId
pure $ AChatItem SCTGroup msgDir (GroupChat g scopeInfo) ci
getChatItemVersions :: DB.Connection -> ChatItemId -> IO [ChatItemVersion]
getChatItemVersions db itemId = do
map toChatItemVersion