From 069395c2a0bd3c8793182bd7433e36ba290a8864 Mon Sep 17 00:00:00 2001 From: spaced4ndy <8711996+spaced4ndy@users.noreply.github.com> Date: Thu, 4 Apr 2024 22:24:42 +0400 Subject: [PATCH] core: entity locks (#3962) * core: entity locks * more locks * update sha256map * add delay * clean up * empty * fix tests * empty * empty * more delays * empty * comment delays * Revert "comment delays" This reverts commit 4245b545fba4dc20d2bc7ce08c0cd46907722fc6. * Revert "Revert "comment delays"" This reverts commit f803386945abe6a3e1d9d94e31e9ed9af2061330. * take lock in the beginning of processing loop * empty * empty * remove lock * rework file locks * empty * fix * empty * add connection locks * empty * fix test * empty * remove commented delays * add to debug locks * update * refactor * refactor --------- Co-authored-by: Evgeny Poberezkin --- cabal.project | 2 +- scripts/nix/sha256map.nix | 2 +- src/Simplex/Chat.hs | 349 ++++++++++++++------------ src/Simplex/Chat/Controller.hs | 15 +- src/Simplex/Chat/Protocol.hs | 2 +- src/Simplex/Chat/Store.hs | 1 + src/Simplex/Chat/Store/Connections.hs | 29 ++- src/Simplex/Chat/Store/Files.hs | 6 +- src/Simplex/Chat/Store/Shared.hs | 9 + src/Simplex/Chat/View.hs | 5 +- tests/ChatTests/Files.hs | 3 +- tests/ChatTests/Profiles.hs | 10 +- 12 files changed, 255 insertions(+), 178 deletions(-) diff --git a/cabal.project b/cabal.project index 62115c136b..4e8cfd22ee 100644 --- a/cabal.project +++ b/cabal.project @@ -12,7 +12,7 @@ constraints: zip +disable-bzip2 +disable-zstd source-repository-package type: git location: https://github.com/simplex-chat/simplexmq.git - tag: 6bc4f6c94e11f59604b0d9c576e62e01bc08b4cd + tag: 791368c7beb3996ab4a10f25dbf8cad1e289b413 source-repository-package type: git diff --git a/scripts/nix/sha256map.nix b/scripts/nix/sha256map.nix index d2701dc45f..a671f28751 100644 --- a/scripts/nix/sha256map.nix +++ b/scripts/nix/sha256map.nix @@ -1,5 +1,5 @@ { - "https://github.com/simplex-chat/simplexmq.git"."6bc4f6c94e11f59604b0d9c576e62e01bc08b4cd" = "08l00ay1ibz7skhlpfjp6z2821zpfd0kxplwdm6zc63m2f6za7cv"; + "https://github.com/simplex-chat/simplexmq.git"."791368c7beb3996ab4a10f25dbf8cad1e289b413" = "0wbxk69lv6h17b5rdqskxwhc1wfvn1zi8q4a4w57qfzkzyaxkymk"; "https://github.com/simplex-chat/hs-socks.git"."a30cc7a79a08d8108316094f8f2f82a0c5e1ac51" = "0yasvnr7g91k76mjkamvzab2kvlb1g5pspjyjn2fr6v83swjhj38"; "https://github.com/simplex-chat/direct-sqlcipher.git"."f814ee68b16a9447fbb467ccc8f29bdd3546bfd9" = "1ql13f4kfwkbaq7nygkxgw84213i0zm7c1a8hwvramayxl38dq5d"; "https://github.com/simplex-chat/sqlcipher-simple.git"."a46bd361a19376c5211f1058908fc0ae6bf42446" = "1z0r78d8f0812kxbgsm735qf6xx8lvaz27k1a0b4a2m0sshpd5gl"; diff --git a/src/Simplex/Chat.hs b/src/Simplex/Chat.hs index 7948875180..c62fdfb8bd 100644 --- a/src/Simplex/Chat.hs +++ b/src/Simplex/Chat.hs @@ -88,9 +88,9 @@ import Simplex.FileTransfer.Description (FileDescriptionURI (..), ValidFileDescr import qualified Simplex.FileTransfer.Description as FD import Simplex.FileTransfer.Protocol (FileParty (..), FilePartyI) import Simplex.Messaging.Agent as Agent -import Simplex.Messaging.Agent.Client (AgentStatsKey (..), SubInfo (..), agentClientStore, getAgentWorkersDetails, getAgentWorkersSummary, temporaryAgentError) +import Simplex.Messaging.Agent.Client (AgentStatsKey (..), SubInfo (..), agentClientStore, getAgentWorkersDetails, getAgentWorkersSummary, temporaryAgentError, withLockMap) import Simplex.Messaging.Agent.Env.SQLite (AgentConfig (..), InitialAgentServers (..), createAgentStore, defaultAgentConfig) -import Simplex.Messaging.Agent.Lock +import Simplex.Messaging.Agent.Lock (withLock) import Simplex.Messaging.Agent.Protocol import qualified Simplex.Messaging.Agent.Protocol as AP (AgentErrorType (..)) import Simplex.Messaging.Agent.Store.SQLite (MigrationConfirmation (..), MigrationError, SQLiteStore (dbNew), execSQL, upMigration, withConnection) @@ -227,6 +227,7 @@ newChatController connNetworkStatuses <- atomically TM.empty subscriptionMode <- newTVarIO SMSubscribe chatLock <- newEmptyTMVarIO + entityLocks <- atomically TM.empty sndFiles <- newTVarIO M.empty rcvFiles <- newTVarIO M.empty currentCalls <- atomically TM.empty @@ -263,6 +264,7 @@ newChatController connNetworkStatuses, subscriptionMode, chatLock, + entityLocks, sndFiles, rcvFiles, currentCalls, @@ -310,6 +312,40 @@ newChatController userServers :: User -> IO (NonEmpty (ProtoServerWithAuth p)) userServers user' = activeAgentServers config protocol <$> withTransaction chatStore (`getProtocolServers` user') +withChatLock :: String -> CM a -> CM a +withChatLock name action = asks chatLock >>= \l -> withLock l name action + +withEntityLock :: String -> ChatLockEntity -> CM a -> CM a +withEntityLock name entity action = do + chatLock <- asks chatLock + ls <- asks entityLocks + atomically $ unlessM (isEmptyTMVar chatLock) retry + withLockMap ls entity name action + +withInvitationLock :: String -> ByteString -> CM a -> CM a +withInvitationLock name = withEntityLock name . CLInvitation +{-# INLINE withInvitationLock #-} + +withConnectionLock :: String -> Int64 -> CM a -> CM a +withConnectionLock name = withEntityLock name . CLConnection +{-# INLINE withConnectionLock #-} + +withContactLock :: String -> ContactId -> CM a -> CM a +withContactLock name = withEntityLock name . CLContact +{-# INLINE withContactLock #-} + +withGroupLock :: String -> GroupId -> CM a -> CM a +withGroupLock name = withEntityLock name . CLGroup +{-# INLINE withGroupLock #-} + +withUserContactLock :: String -> Int64 -> CM a -> CM a +withUserContactLock name = withEntityLock name . CLUserContact +{-# INLINE withUserContactLock #-} + +withFileLock :: String -> Int64 -> CM a -> CM a +withFileLock name = withEntityLock name . CLFile +{-# INLINE withFileLock #-} + activeAgentServers :: UserProtocol p => ChatConfig -> SProtocolType p -> [ServerCfg p] -> NonEmpty (ProtoServerWithAuth p) activeAgentServers ChatConfig {defaultServers} p = fromMaybe (cfgServers p defaultServers) @@ -669,8 +705,8 @@ processChatCommand' vr = \case memStatuses -> pure $ Just $ map (uncurry MemberDeliveryStatus) memStatuses _ -> pure Nothing pure $ CRChatItemInfo user aci ChatItemInfo {itemVersions, memberDeliveryStatuses} - APISendMessage (ChatRef cType chatId) live itemTTL (ComposedMessage file_ quotedItemId_ mc) -> withUser $ \user -> withChatLock "sendMessage" $ case cType of - CTDirect -> do + APISendMessage (ChatRef cType chatId) live itemTTL (ComposedMessage file_ quotedItemId_ mc) -> withUser $ \user -> case cType of + CTDirect -> withContactLock "sendMessage" chatId $ do ct@Contact {contactId, contactUsed} <- withStore $ \db -> getContact db vr user chatId assertDirectAllowed user MDSnd ct XMsgNew_ unless contactUsed $ withStore' $ \db -> updateContactUsed db user ct @@ -707,7 +743,7 @@ processChatCommand' vr = \case quoteData ChatItem {content = CISndMsgContent qmc} = pure (qmc, CIQDirectSnd, True) quoteData ChatItem {content = CIRcvMsgContent qmc} = pure (qmc, CIQDirectRcv, False) quoteData _ = throwChatError CEInvalidQuote - CTGroup -> do + CTGroup -> withGroupLock "sendMessage" chatId $ do g@(Group gInfo _) <- withStore $ \db -> getGroup db vr user chatId assertUserGroupRole gInfo GRAuthor send g @@ -767,8 +803,8 @@ processChatCommand' vr = \case pure CIFile {fileId, fileName = takeFileName filePath, fileSize, fileSource = Just cf, fileStatus = CIFSSndStored, fileProtocol = FPLocal} let ci = mkChatItem cd ciId content ciFile_ Nothing Nothing Nothing False createdAt Nothing createdAt pure . CRNewChatItem user $ AChatItem SCTLocal SMDSnd (LocalChat nf) ci - APIUpdateChatItem (ChatRef cType chatId) itemId live mc -> withUser $ \user -> withChatLock "updateChatItem" $ case cType of - CTDirect -> do + APIUpdateChatItem (ChatRef cType chatId) itemId live mc -> withUser $ \user -> case cType of + CTDirect -> withContactLock "updateChatItem" chatId $ do ct@Contact {contactId} <- withStore $ \db -> getContact db vr user chatId assertDirectAllowed user MDSnd ct XMsgUpdate_ cci <- withStore $ \db -> getDirectCIWithReactions db user ct itemId @@ -790,7 +826,7 @@ processChatCommand' vr = \case else pure $ CRChatItemNotChanged user (AChatItem SCTDirect SMDSnd (DirectChat ct) ci) _ -> throwChatError CEInvalidChatItemUpdate CChatItem SMDRcv _ -> throwChatError CEInvalidChatItemUpdate - CTGroup -> do + CTGroup -> withGroupLock "updateChatItem" chatId $ do Group gInfo@GroupInfo {groupId} ms <- withStore $ \db -> getGroup db vr user chatId assertUserGroupRole gInfo GRAuthor cci <- withStore $ \db -> getGroupCIWithReactions db user gInfo itemId @@ -825,8 +861,8 @@ processChatCommand' vr = \case _ -> throwChatError CEInvalidChatItemUpdate CTContactRequest -> pure $ chatCmdError (Just user) "not supported" CTContactConnection -> pure $ chatCmdError (Just user) "not supported" - APIDeleteChatItem (ChatRef cType chatId) itemId mode -> withUser $ \user -> withChatLock "deleteChatItem" $ case cType of - CTDirect -> do + APIDeleteChatItem (ChatRef cType chatId) itemId mode -> withUser $ \user -> case cType of + CTDirect -> withContactLock "deleteChatItem" chatId $ do (ct, CChatItem msgDir ci@ChatItem {meta = CIMeta {itemSharedMsgId, editable}}) <- withStore $ \db -> (,) <$> getContact db vr user chatId <*> getDirectChatItem db user chatId itemId case (mode, msgDir, itemSharedMsgId, editable) of (CIDMInternal, _, _, _) -> deleteDirectCI user ct ci True False @@ -837,7 +873,7 @@ processChatCommand' vr = \case then deleteDirectCI user ct ci True False else markDirectCIDeleted user ct ci msgId True =<< liftIO getCurrentTime (CIDMBroadcast, _, _, _) -> throwChatError CEInvalidChatItemDelete - CTGroup -> do + CTGroup -> withGroupLock "deleteChatItem" chatId $ do Group gInfo ms <- withStore $ \db -> getGroup db vr user chatId CChatItem msgDir ci@ChatItem {meta = CIMeta {itemSharedMsgId, editable}} <- withStore $ \db -> getGroupChatItem db user chatId itemId case (mode, msgDir, itemSharedMsgId, editable) of @@ -852,7 +888,7 @@ processChatCommand' vr = \case deleteLocalCI user nf ci True False CTContactRequest -> pure $ chatCmdError (Just user) "not supported" CTContactConnection -> pure $ chatCmdError (Just user) "not supported" - APIDeleteMemberChatItem gId mId itemId -> withUser $ \user -> withChatLock "deleteChatItem" $ do + APIDeleteMemberChatItem gId mId itemId -> withUser $ \user -> withGroupLock "deleteChatItem" gId $ do Group gInfo@GroupInfo {membership} ms <- withStore $ \db -> getGroup db vr user gId CChatItem _ ci@ChatItem {chatDir, meta = CIMeta {itemSharedMsgId}} <- withStore $ \db -> getGroupChatItem db user gId itemId case (chatDir, itemSharedMsgId) of @@ -862,44 +898,46 @@ processChatCommand' vr = \case (SndMessage {msgId}, _) <- sendGroupMessage user gInfo ms $ XMsgDel itemSharedMId $ Just memberId delGroupChatItem user gInfo ci msgId (Just membership) (_, _) -> throwChatError CEInvalidChatItemDelete - APIChatItemReaction (ChatRef cType chatId) itemId add reaction -> withUser $ \user -> withChatLock "chatItemReaction" $ case cType of + APIChatItemReaction (ChatRef cType chatId) itemId add reaction -> withUser $ \user -> case cType of CTDirect -> - withStore (\db -> (,) <$> getContact db vr user chatId <*> getDirectChatItem db user chatId itemId) >>= \case - (ct, CChatItem md ci@ChatItem {meta = CIMeta {itemSharedMsgId = Just itemSharedMId}}) -> do - unless (featureAllowed SCFReactions forUser ct) $ - throwChatError (CECommandError $ "feature not allowed " <> T.unpack (chatFeatureNameText CFReactions)) - unless (ciReactionAllowed ci) $ - throwChatError (CECommandError "reaction not allowed - chat item has no content") - rs <- withStore' $ \db -> getDirectReactions db ct itemSharedMId True - checkReactionAllowed rs - (SndMessage {msgId}, _) <- sendDirectContactMessage user ct $ XMsgReact itemSharedMId Nothing reaction add - createdAt <- liftIO getCurrentTime - reactions <- withStore' $ \db -> do - setDirectReaction db ct itemSharedMId True reaction add msgId createdAt - liftIO $ getDirectCIReactions db ct itemSharedMId - let ci' = CChatItem md ci {reactions} - r = ACIReaction SCTDirect SMDSnd (DirectChat ct) $ CIReaction CIDirectSnd ci' createdAt reaction - pure $ CRChatItemReaction user add r - _ -> throwChatError $ CECommandError "reaction not possible - no shared item ID" + withContactLock "chatItemReaction" chatId $ + withStore (\db -> (,) <$> getContact db vr user chatId <*> getDirectChatItem db user chatId itemId) >>= \case + (ct, CChatItem md ci@ChatItem {meta = CIMeta {itemSharedMsgId = Just itemSharedMId}}) -> do + unless (featureAllowed SCFReactions forUser ct) $ + throwChatError (CECommandError $ "feature not allowed " <> T.unpack (chatFeatureNameText CFReactions)) + unless (ciReactionAllowed ci) $ + throwChatError (CECommandError "reaction not allowed - chat item has no content") + rs <- withStore' $ \db -> getDirectReactions db ct itemSharedMId True + checkReactionAllowed rs + (SndMessage {msgId}, _) <- sendDirectContactMessage user ct $ XMsgReact itemSharedMId Nothing reaction add + createdAt <- liftIO getCurrentTime + reactions <- withStore' $ \db -> do + setDirectReaction db ct itemSharedMId True reaction add msgId createdAt + liftIO $ getDirectCIReactions db ct itemSharedMId + let ci' = CChatItem md ci {reactions} + r = ACIReaction SCTDirect SMDSnd (DirectChat ct) $ CIReaction CIDirectSnd ci' createdAt reaction + pure $ CRChatItemReaction user add r + _ -> throwChatError $ CECommandError "reaction not possible - no shared item ID" CTGroup -> - withStore (\db -> (,) <$> getGroup db vr user chatId <*> getGroupChatItem db user chatId itemId) >>= \case - (Group g@GroupInfo {membership} ms, CChatItem md ci@ChatItem {meta = CIMeta {itemSharedMsgId = Just itemSharedMId}}) -> do - unless (groupFeatureAllowed SGFReactions g) $ - throwChatError (CECommandError $ "feature not allowed " <> T.unpack (chatFeatureNameText CFReactions)) - unless (ciReactionAllowed ci) $ - throwChatError (CECommandError "reaction not allowed - chat item has no content") - let GroupMember {memberId = itemMemberId} = chatItemMember g ci - rs <- withStore' $ \db -> getGroupReactions db g membership itemMemberId itemSharedMId True - checkReactionAllowed rs - (SndMessage {msgId}, _) <- sendGroupMessage user g ms (XMsgReact itemSharedMId (Just itemMemberId) reaction add) - createdAt <- liftIO getCurrentTime - reactions <- withStore' $ \db -> do - setGroupReaction db g membership itemMemberId itemSharedMId True reaction add msgId createdAt - liftIO $ getGroupCIReactions db g itemMemberId itemSharedMId - let ci' = CChatItem md ci {reactions} - r = ACIReaction SCTGroup SMDSnd (GroupChat g) $ CIReaction CIGroupSnd ci' createdAt reaction - pure $ CRChatItemReaction user add r - _ -> throwChatError $ CECommandError "reaction not possible - no shared item ID" + withGroupLock "chatItemReaction" chatId $ + withStore (\db -> (,) <$> getGroup db vr user chatId <*> getGroupChatItem db user chatId itemId) >>= \case + (Group g@GroupInfo {membership} ms, CChatItem md ci@ChatItem {meta = CIMeta {itemSharedMsgId = Just itemSharedMId}}) -> do + unless (groupFeatureAllowed SGFReactions g) $ + throwChatError (CECommandError $ "feature not allowed " <> T.unpack (chatFeatureNameText CFReactions)) + unless (ciReactionAllowed ci) $ + throwChatError (CECommandError "reaction not allowed - chat item has no content") + let GroupMember {memberId = itemMemberId} = chatItemMember g ci + rs <- withStore' $ \db -> getGroupReactions db g membership itemMemberId itemSharedMId True + checkReactionAllowed rs + (SndMessage {msgId}, _) <- sendGroupMessage user g ms (XMsgReact itemSharedMId (Just itemMemberId) reaction add) + createdAt <- liftIO getCurrentTime + reactions <- withStore' $ \db -> do + setGroupReaction db g membership itemMemberId itemSharedMId True reaction add msgId createdAt + liftIO $ getGroupCIReactions db g itemMemberId itemSharedMId + let ci' = CChatItem md ci {reactions} + r = ACIReaction SCTGroup SMDSnd (GroupChat g) $ CIReaction CIGroupSnd ci' createdAt reaction + pure $ CRChatItemReaction user add r + _ -> throwChatError $ CECommandError "reaction not possible - no shared item ID" CTLocal -> pure $ chatCmdError (Just user) "not supported" CTContactRequest -> pure $ chatCmdError (Just user) "not supported" CTContactConnection -> pure $ chatCmdError (Just user) "not supported" @@ -959,7 +997,7 @@ processChatCommand' vr = \case CTDirect -> do ct <- withStore $ \db -> getContact db vr user chatId filesInfo <- withStore' $ \db -> getContactFileInfo db user ct - withChatLock "deleteChat direct" . procCmd $ do + withContactLock "deleteChat direct" chatId . procCmd $ do cancelFilesInProgress user filesInfo deleteFilesLocally filesInfo let doSendDel = contactReady ct && contactActive ct && notify @@ -971,7 +1009,7 @@ processChatCommand' vr = \case withStore' $ \db -> deleteContactConnectionsAndFiles db userId ct withStore $ \db -> deleteContact db user ct pure $ CRContactDeleted user ct - CTContactConnection -> withChatLock "deleteChat contactConnection" . procCmd $ do + CTContactConnection -> withConnectionLock "deleteChat contactConnection" chatId . procCmd $ do conn@PendingContactConnection {pccAgentConnId = AgentConnId acId} <- withStore $ \db -> getPendingContactConnection db userId chatId deleteAgentConnectionAsync user acId withStore' $ \db -> deletePendingContactConnection db userId chatId @@ -983,7 +1021,7 @@ processChatCommand' vr = \case canDelete = isOwner || not (memberCurrent membership) unless canDelete $ throwChatError $ CEGroupUserRole gInfo GROwner filesInfo <- withStore' $ \db -> getGroupFileInfo db user gInfo - withChatLock "deleteChat group" . procCmd $ do + withGroupLock "deleteChat group" chatId . procCmd $ do cancelFilesInProgress user filesInfo deleteFilesLocally filesInfo let doSendDel = memberActive membership && isOwner @@ -1038,28 +1076,29 @@ processChatCommand' vr = \case CTLocal -> do nf <- withStore $ \db -> getNoteFolder db user chatId filesInfo <- withStore' $ \db -> getNoteFolderFileInfo db user nf - withChatLock "clearChat local" . procCmd $ do - deleteFilesLocally filesInfo - withStore' $ \db -> deleteNoteFolderFiles db userId nf - withStore' $ \db -> deleteNoteFolderCIs db user nf - pure $ CRChatCleared user (AChatInfo SCTLocal $ LocalChat nf) + deleteFilesLocally filesInfo + withStore' $ \db -> deleteNoteFolderFiles db userId nf + withStore' $ \db -> deleteNoteFolderCIs db user nf + pure $ CRChatCleared user (AChatInfo SCTLocal $ LocalChat nf) CTContactConnection -> pure $ chatCmdError (Just user) "not supported" CTContactRequest -> pure $ chatCmdError (Just user) "not supported" - APIAcceptContact incognito connReqId -> withUser $ \_ -> withChatLock "acceptContact" $ do + APIAcceptContact incognito connReqId -> withUser $ \_ -> do (user@User {userId}, cReq@UserContactRequest {userContactLinkId}) <- withStore $ \db -> getContactRequest' db connReqId - ucl <- withStore $ \db -> getUserContactLinkById db userId userContactLinkId - let contactUsed = (\(_, groupId_, _) -> isNothing groupId_) ucl - -- [incognito] generate profile to send, create connection with incognito profile - incognitoProfile <- if incognito then Just . NewIncognito <$> liftIO generateRandomProfile else pure Nothing - ct <- acceptContactRequest user cReq incognitoProfile contactUsed - pure $ CRAcceptingContactRequest user ct - APIRejectContact connReqId -> withUser $ \user -> withChatLock "rejectContact" $ do - cReq@UserContactRequest {agentContactConnId = AgentConnId connId, agentInvitationId = AgentInvId invId} <- + withUserContactLock "acceptContact" userContactLinkId $ do + ucl <- withStore $ \db -> getUserContactLinkById db userId userContactLinkId + let contactUsed = (\(_, groupId_, _) -> isNothing groupId_) ucl + -- [incognito] generate profile to send, create connection with incognito profile + incognitoProfile <- if incognito then Just . NewIncognito <$> liftIO generateRandomProfile else pure Nothing + ct <- acceptContactRequest user cReq incognitoProfile contactUsed + pure $ CRAcceptingContactRequest user ct + APIRejectContact connReqId -> withUser $ \user -> do + cReq@UserContactRequest {userContactLinkId, agentContactConnId = AgentConnId connId, agentInvitationId = AgentInvId invId} <- withStore $ \db -> getContactRequest db user connReqId `storeFinally` liftIO (deleteContactRequest db user connReqId) - withAgent $ \a -> rejectContact a connId invId - pure $ CRContactRequestRejected user cReq + withUserContactLock "rejectContact" userContactLinkId $ do + withAgent $ \a -> rejectContact a connId invId + pure $ CRContactRequestRejected user cReq APISendCallInvitation contactId callType -> withUser $ \user -> do -- party initiating call ct <- withStore $ \db -> getContact db vr user contactId @@ -1067,7 +1106,7 @@ processChatCommand' vr = \case if featureAllowed SCFCalls forUser ct then do calls <- asks currentCalls - withChatLock "sendCallInvitation" $ do + withContactLock "sendCallInvitation" contactId $ do g <- asks random callId <- atomically $ CallId <$> C.randomBytes 16 g dhKeyPair <- atomically $ if encryptedCall callType then Just <$> C.generateKeyPair g else pure Nothing @@ -1192,12 +1231,11 @@ processChatCommand' vr = \case toServerCfg server = ServerCfg {server, preset = True, tested = Nothing, enabled = True} GetUserProtoServers aProtocol -> withUser $ \User {userId} -> processChatCommand $ APIGetUserProtoServers userId aProtocol - APISetUserProtoServers userId (APSC p (ProtoServersConfig servers)) -> withUserId userId $ \user -> withServerProtocol p $ - withChatLock "setUserSMPServers" $ do - withStore $ \db -> overwriteProtocolServers db user servers - cfg <- asks config - lift $ withAgent' $ \a -> setProtocolServers a (aUserId user) $ activeAgentServers cfg p servers - ok user + APISetUserProtoServers userId (APSC p (ProtoServersConfig servers)) -> withUserId userId $ \user -> withServerProtocol p $ do + withStore $ \db -> overwriteProtocolServers db user servers + cfg <- asks config + lift $ withAgent' $ \a -> setProtocolServers a (aUserId user) $ activeAgentServers cfg p servers + ok user SetUserProtoServers serversConfig -> withUser $ \User {userId} -> processChatCommand $ APISetUserProtoServers userId serversConfig APITestProtoServer userId srv@(AProtoServerWithAuth _ server) -> withUserId userId $ \user -> @@ -1300,7 +1338,7 @@ processChatCommand' vr = \case connectionStats <- withAgent $ \a -> abortConnectionSwitch a connId pure $ CRGroupMemberSwitchAborted user g m connectionStats _ -> throwChatError CEGroupMemberNotActive - APISyncContactRatchet contactId force -> withUser $ \user -> withChatLock "syncContactRatchet" $ do + APISyncContactRatchet contactId force -> withUser $ \user -> withContactLock "syncContactRatchet" contactId $ do ct <- withStore $ \db -> getContact db vr user contactId case contactConn ct of Just conn@Connection {pqSupport} -> do @@ -1308,7 +1346,7 @@ processChatCommand' vr = \case createInternalChatItem user (CDDirectSnd ct) (CISndConnEvent $ SCERatchetSync rss Nothing) Nothing pure $ CRContactRatchetSyncStarted user ct cStats Nothing -> throwChatError $ CEContactNotActive ct - APISyncGroupMemberRatchet gId gMemberId force -> withUser $ \user -> withChatLock "syncGroupMemberRatchet" $ do + APISyncGroupMemberRatchet gId gMemberId force -> withUser $ \user -> withGroupLock "syncGroupMemberRatchet" gId $ do (g, m) <- withStore $ \db -> (,) <$> getGroupInfo db vr user gId <*> getGroupMember db vr user gId gMemberId case memberConnId m of Just connId -> do @@ -1397,7 +1435,7 @@ processChatCommand' vr = \case EnableGroupMember gName mName -> withMemberName gName mName $ \gId mId -> APIEnableGroupMember gId mId ChatHelp section -> pure $ CRChatHelp section Welcome -> withUser $ pure . CRWelcome - APIAddContact userId incognito -> withUserId userId $ \user -> withChatLock "addContact" . procCmd $ do + APIAddContact userId incognito -> withUserId userId $ \user -> procCmd $ do -- [incognito] generate profile for connection incognitoProfile <- if incognito then Just <$> liftIO generateRandomProfile else pure Nothing subMode <- chatReadVar subscriptionMode @@ -1424,9 +1462,8 @@ processChatCommand' vr = \case Just conn' -> pure $ CRConnectionIncognitoUpdated user conn' Nothing -> throwChatError CEConnectionIncognitoChangeProhibited APIConnectPlan userId cReqUri -> withUserId userId $ \user -> - withChatLock "connectPlan" . procCmd $ - CRConnectionPlan user <$> connectPlan user cReqUri - APIConnect userId incognito (Just (ACR SCMInvitation cReq)) -> withUserId userId $ \user -> withChatLock "connect" . procCmd $ do + CRConnectionPlan user <$> connectPlan user cReqUri + APIConnect userId incognito (Just (ACR SCMInvitation cReq)) -> withUserId userId $ \user -> withInvitationLock "connect" (strEncode cReq) . procCmd $ do subMode <- chatReadVar subscriptionMode -- [incognito] generate profile to send incognitoProfile <- if incognito then Just <$> liftIO generateRandomProfile else pure Nothing @@ -1471,7 +1508,7 @@ processChatCommand' vr = \case CRContactsList user <$> withStore' (\db -> getUserContacts db vr user) ListContacts -> withUser $ \User {userId} -> processChatCommand $ APIListContacts userId - APICreateMyAddress userId -> withUserId userId $ \user -> withChatLock "createMyAddress" . procCmd $ do + APICreateMyAddress userId -> withUserId userId $ \user -> procCmd $ do subMode <- chatReadVar subscriptionMode -- TODO v5.7 pass IPPQOn (connId, cReq) <- withAgent $ \a -> createConnection a (aUserId user) True SCMContact Nothing IKPQOff subMode @@ -1636,7 +1673,7 @@ processChatCommand' vr = \case pure $ CRGroupCreated user groupInfo NewGroup incognito gProfile -> withUser $ \User {userId} -> processChatCommand $ APINewGroup userId incognito gProfile - APIAddMember groupId contactId memRole -> withUser $ \user -> withChatLock "addMember" $ do + APIAddMember groupId contactId memRole -> withUser $ \user -> withGroupLock "addMember" groupId $ do -- TODO for large groups: no need to load all members to determine if contact is a member (group, contact) <- withStore $ \db -> (,) <$> getGroup db vr user groupId <*> getContact db vr user contactId assertDirectAllowed user MDSnd contact XGrpInv_ @@ -1666,7 +1703,7 @@ processChatCommand' vr = \case Nothing -> throwChatError $ CEGroupCantResendInvitation gInfo cName | otherwise -> throwChatError $ CEGroupDuplicateMember cName APIJoinGroup groupId -> withUser $ \user@User {userId} -> do - withChatLock "joinGroup" . procCmd $ do + withGroupLock "joinGroup" groupId . procCmd $ do (invitation, ct) <- withStore $ \db -> do inv@ReceivedGroupInvitation {fromMember} <- getGroupInvitation db vr user groupId (inv,) <$> getContactViaMember db vr user fromMember @@ -1697,7 +1734,7 @@ processChatCommand' vr = \case changeMemberRole user gInfo members m gEvent = do let GroupMember {memberId = mId, memberRole = mRole, memberStatus = mStatus, memberContactId, localDisplayName = cName} = m assertUserGroupRole gInfo $ maximum [GRAdmin, mRole, memRole] - withChatLock "memberRole" . procCmd $ do + withGroupLock "memberRole" groupId . procCmd $ do unless (mRole == memRole) $ do withStore' $ \db -> updateGroupMemberRole db user m memRole case mStatus of @@ -1719,7 +1756,7 @@ processChatCommand' vr = \case let GroupMember {memberId = bmMemberId, memberRole = bmRole, memberProfile = bmp} = bm assertUserGroupRole gInfo $ max GRAdmin bmRole when (blocked == blockedByAdmin bm) $ throwChatError $ CECommandError $ if blocked then "already blocked" else "already unblocked" - withChatLock "blockForAll" . procCmd $ do + withGroupLock "blockForAll" groupId . procCmd $ do let mrs = if blocked then MRSBlocked else MRSUnrestricted event = XGrpMemRestrict bmMemberId MemberRestrictions {restriction = mrs} (msg, _) <- sendGroupMessage' user gInfo remainingMembers event @@ -1741,7 +1778,7 @@ processChatCommand' vr = \case Nothing -> throwChatError CEGroupMemberNotFound Just m@GroupMember {memberId = mId, memberRole = mRole, memberStatus = mStatus, memberProfile} -> do assertUserGroupRole gInfo $ max GRAdmin mRole - withChatLock "removeMember" . procCmd $ do + withGroupLock "removeMember" groupId . procCmd $ do case mStatus of GSMemInvited -> do deleteMemberConnection user m @@ -1757,7 +1794,7 @@ processChatCommand' vr = \case APILeaveGroup groupId -> withUser $ \user@User {userId} -> do Group gInfo@GroupInfo {membership} members <- withStore $ \db -> getGroup db vr user groupId filesInfo <- withStore' $ \db -> getGroupFileInfo db user gInfo - withChatLock "leaveGroup" . procCmd $ do + withGroupLock "leaveGroup" groupId . procCmd $ do cancelFilesInProgress user filesInfo (msg, _) <- sendGroupMessage' user gInfo members XGrpLeave ci <- saveSndChatItem user (CDGroupSnd gInfo) msg (CISndGroupEvent SGEUserLeft) @@ -1807,7 +1844,7 @@ processChatCommand' vr = \case updateGroupProfileByName gName $ \p -> p {description} ShowGroupDescription gName -> withUser $ \user -> CRGroupDescription user <$> withStore (\db -> getGroupInfoByName db vr user gName) - APICreateGroupLink groupId mRole -> withUser $ \user -> withChatLock "createGroupLink" $ do + APICreateGroupLink groupId mRole -> withUser $ \user -> withGroupLock "createGroupLink" groupId $ do gInfo <- withStore $ \db -> getGroupInfo db vr user groupId assertUserGroupRole gInfo GRAdmin when (mRole > GRMember) $ throwChatError $ CEGroupMemberInitialRole gInfo mRole @@ -1817,14 +1854,14 @@ processChatCommand' vr = \case (connId, cReq) <- withAgent $ \a -> createConnection a (aUserId user) True SCMContact (Just crClientData) IKPQOff subMode withStore $ \db -> createGroupLink db user gInfo connId cReq groupLinkId mRole subMode pure $ CRGroupLinkCreated user gInfo cReq mRole - APIGroupLinkMemberRole groupId mRole' -> withUser $ \user -> withChatLock "groupLinkMemberRole " $ do + APIGroupLinkMemberRole groupId mRole' -> withUser $ \user -> withGroupLock "groupLinkMemberRole" groupId $ do gInfo <- withStore $ \db -> getGroupInfo db vr user groupId (groupLinkId, groupLink, mRole) <- withStore $ \db -> getGroupLink db user gInfo assertUserGroupRole gInfo GRAdmin when (mRole' > GRMember) $ throwChatError $ CEGroupMemberInitialRole gInfo mRole' when (mRole' /= mRole) $ withStore' $ \db -> setGroupLinkMemberRole db user groupLinkId mRole' pure $ CRGroupLink user gInfo groupLink mRole' - APIDeleteGroupLink groupId -> withUser $ \user -> withChatLock "deleteGroupLink" $ do + APIDeleteGroupLink groupId -> withUser $ \user -> withGroupLock "deleteGroupLink" groupId $ do gInfo <- withStore $ \db -> getGroupInfo db vr user groupId deleteGroupLink' user gInfo pure $ CRGroupLinkDeleted user gInfo @@ -1932,19 +1969,19 @@ processChatCommand' vr = \case ForwardImage chatName fileId -> forwardFile chatName fileId SendImage SendFileDescription _chatName _f -> pure $ chatCmdError Nothing "TODO" ReceiveFile fileId encrypted_ rcvInline_ filePath_ -> withUser $ \_ -> - withChatLock "receiveFile" . procCmd $ do + withFileLock "receiveFile" fileId . procCmd $ do (user, ft) <- withStore (`getRcvFileTransferById` fileId) encrypt <- (`fromMaybe` encrypted_) <$> chatReadVar encryptLocalFiles ft' <- (if encrypt then setFileToEncrypt else pure) ft receiveFile' user ft' rcvInline_ filePath_ SetFileToReceive fileId encrypted_ -> withUser $ \_ -> do - withChatLock "setFileToReceive" . procCmd $ do + withFileLock "setFileToReceive" fileId . procCmd $ do encrypt <- (`fromMaybe` encrypted_) <$> chatReadVar encryptLocalFiles cfArgs <- if encrypt then Just <$> (atomically . CF.randomArgs =<< asks random) else pure Nothing withStore' $ \db -> setRcvFileToReceive db fileId cfArgs ok_ CancelFile fileId -> withUser $ \user@User {userId} -> - withChatLock "cancelFile" . procCmd $ + withFileLock "cancelFile" fileId . procCmd $ withStore (\db -> getFileTransfer db user fileId) >>= \case FTSnd ftm@FileTransferMeta {xftpSndFile, cancelled} fts | cancelled -> throwChatError $ CEFileCancel fileId "file already cancelled" @@ -2074,8 +2111,18 @@ processChatCommand' vr = \case pure $ CRVersionInfo {versionInfo, chatMigrations, agentMigrations} DebugLocks -> lift $ do chatLockName <- atomically . tryReadTMVar =<< asks chatLock + chatEntityLocks <- getLocks =<< asks entityLocks agentLocks <- withAgent' debugAgentLocks - pure CRDebugLocks {chatLockName, agentLocks} + pure CRDebugLocks {chatLockName, chatEntityLocks, agentLocks} + where + getLocks ls = atomically $ M.mapKeys enityLockString . M.mapMaybe id <$> (mapM tryReadTMVar =<< readTVar ls) + enityLockString cle = case cle of + CLInvitation bs -> "Invitation " <> B.unpack bs + CLConnection connId -> "Connection " <> show connId + CLContact ctId -> "Contact " <> show ctId + CLGroup gId -> "Group " <> show gId + CLUserContact ucId -> "UserContact " <> show ucId + CLFile fId -> "File " <> show fId GetAgentWorkers -> lift $ CRAgentWorkersSummary <$> withAgent' getAgentWorkersSummary GetAgentWorkersDetails -> lift $ CRAgentWorkersDetails <$> withAgent' getAgentWorkersDetails GetAgentStats -> lift $ CRAgentStats . map stat <$> withAgent' getAgentStats @@ -2101,7 +2148,6 @@ processChatCommand' vr = \case -- in a modified CLI app or core - the hook should return Either ChatResponse ChatCommand CustomChatCommand _cmd -> withUser $ \user -> pure $ chatCmdError (Just user) "not supported" where - withChatLock name action = asks chatLock >>= \l -> withLock l name action -- below code would make command responses asynchronous where they can be slow -- in View.hs `r'` should be defined as `id` in this case -- procCmd :: m ChatResponse -> m ChatResponse @@ -2167,7 +2213,7 @@ processChatCommand' vr = \case CTLocal -> withStore $ \db -> getLocalChatItemIdByText' db user cId msg _ -> throwChatError $ CECommandError "not supported" connectViaContact :: User -> IncognitoEnabled -> ConnectionRequestUri 'CMContact -> CM ChatResponse - connectViaContact user@User {userId} incognito cReq@(CRContactUri ConnReqUriData {crClientData}) = withChatLock "connectViaContact" $ do + connectViaContact user@User {userId} incognito cReq@(CRContactUri ConnReqUriData {crClientData}) = withInvitationLock "connectViaContact" (strEncode cReq) $ do let groupLinkId = crClientData >>= decodeJSON >>= \(CRDataGroup gli) -> Just gli cReqHash = ConnReqUriHash . C.sha256Hash $ strEncode cReq case groupLinkId of @@ -2198,7 +2244,7 @@ processChatCommand' vr = \case pure $ CRSentInvitation user conn incognitoProfile connectContactViaAddress :: User -> IncognitoEnabled -> Contact -> ConnectionRequestUri 'CMContact -> CM ChatResponse connectContactViaAddress user incognito ct cReq = - withChatLock "connectViaContact" $ do + withInvitationLock "connectContactViaAddress" (strEncode cReq) $ do newXContactId <- XContactId <$> drgRandomBytes 16 pqSup <- chatReadVar pqExperimentalEnabled (connId, incognitoProfile, subMode, chatV) <- requestContact user incognito cReq newXContactId False pqSup @@ -2265,8 +2311,9 @@ processChatCommand' vr = \case -- [incognito] filter out contacts with whom user has incognito connections addChangedProfileContact :: User -> Contact -> [ChangedProfileContact] -> [ChangedProfileContact] addChangedProfileContact user' ct changedCts = case contactSendConn_ ct' of - Right conn | not (connIncognito conn) && mergedProfile' /= mergedProfile -> - ChangedProfileContact ct ct' mergedProfile' conn : changedCts + Right conn + | not (connIncognito conn) && mergedProfile' /= mergedProfile -> + ChangedProfileContact ct ct' mergedProfile' conn : changedCts _ -> changedCts where mergedProfile = userProfileToSend user Nothing (Just ct) False @@ -2289,7 +2336,7 @@ processChatCommand' vr = \case let mergedProfile = userProfileToSend user (fromLocalProfile <$> incognitoProfile) (Just ct) False mergedProfile' = userProfileToSend user (fromLocalProfile <$> incognitoProfile) (Just ct') False when (mergedProfile' /= mergedProfile) $ - withChatLock "updateProfile" $ do + withContactLock "updateProfile" (contactId' ct) $ do void (sendDirectContactMessage user ct' $ XInfo mergedProfile') `catchChatError` (toView . CRChatError (Just user)) lift . when (directOrUsed ct') $ createSndFeatureItems user ct ct' pure $ CRContactPrefsUpdated user ct ct' @@ -2334,7 +2381,7 @@ processChatCommand' vr = \case user <- getUserByContactId db ctId (user,) <$> getContact db vr user ctId calls <- asks currentCalls - withChatLock "currentCall" $ + withContactLock "currentCall" ctId $ atomically (TM.lookup ctId calls) >>= \case Nothing -> throwChatError CENoCurrentCall Just call@Call {contactId} @@ -2988,21 +3035,16 @@ deleteGroupLink_ user gInfo conn = do agentSubscriber :: CM' () agentSubscriber = do q <- asks $ subQ . smpAgent - l <- asks chatLock - forever $ atomically (readTBQueue q) >>= process l + forever $ atomically (readTBQueue q) >>= process where - process :: Lock -> (ACorrId, EntityId, APartyCmd 'Agent) -> CM' () - process l (corrId, entId, APC e msg) = run $ case e of + process :: (ACorrId, EntityId, APartyCmd 'Agent) -> CM' () + process (corrId, entId, APC e msg) = run $ case e of SAENone -> processAgentMessageNoConn msg SAEConn -> processAgentMessage corrId entId msg SAERcvFile -> processAgentMsgRcvFile corrId entId msg SAESndFile -> processAgentMsgSndFile corrId entId msg where - run action = do - let name = "agentSubscriber entity=" <> show e <> " entId=" <> str entId <> " msg=" <> str (aCommandTag msg) - withLock' l name $ action `catchChatError'` (toView' . CRChatError Nothing) - str :: StrEncoding a => a -> String - str = B.unpack . strEncode + run action = action `catchChatError'` (toView' . CRChatError Nothing) type AgentBatchSubscribe = AgentClient -> [ConnId] -> ExceptT AgentErrorType IO (Map ConnId (Either AgentErrorType ())) @@ -3150,8 +3192,7 @@ subscribeUserConnections vr onlyNeeded agentBatchSubscribe user = do forM_ err_ $ toView . CRSndFileSubError user ft void . forkIO $ do threadDelay 1000000 - l <- asks chatLock - when (fileStatus == FSConnected) . unlessM (isFileActive fileId sndFiles) . withLock l "subscribe sendFileChunk" $ + when (fileStatus == FSConnected) . unlessM (isFileActive fileId sndFiles) . withChatLock "subscribe sendFileChunk" $ sendFileChunk user ft rcvFileSubsToView :: Map ConnId (Either AgentErrorType ()) -> Map ConnId RcvFileTransfer -> CM () rcvFileSubsToView rs = mapM_ (toView . uncurry (CRRcvFileSubError user)) . filterErrors . resultsFor rs @@ -3317,11 +3358,13 @@ processAgentMessage _ connId (DEL_RCVQ srv qId err_) = processAgentMessage _ connId DEL_CONN = toView $ CRAgentConnDeleted (AgentConnId connId) processAgentMessage corrId connId msg = do - vr <- chatVersionRange - -- getUserByAConnId never throws logical errors, only SEDBBusyError can be thrown here - critical (withStore' (`getUserByAConnId` AgentConnId connId)) >>= \case - Just user -> processAgentMessageConn vr user corrId connId msg `catchChatError` (toView . CRChatError (Just user)) - _ -> throwChatError $ CENoConnectionUser (AgentConnId connId) + lockEntity <- critical (withStore (`getChatLockEntity` AgentConnId connId)) + withEntityLock "processAgentMessage" lockEntity $ do + vr <- chatVersionRange + -- getUserByAConnId never throws logical errors, only SEDBBusyError can be thrown here + critical (withStore' (`getUserByAConnId` AgentConnId connId)) >>= \case + Just user -> processAgentMessageConn vr user corrId connId msg `catchChatError` (toView . CRChatError (Just user)) + _ -> throwChatError $ CENoConnectionUser (AgentConnId connId) -- CRITICAL error will be shown to the user as alert with restart button in Android/desktop apps. -- SEDBBusyError will only be thrown on IO exceptions or SQLError during DB queries, @@ -3358,18 +3401,18 @@ processAgentMessageNoConn = \case toView $ event srv cs processAgentMsgSndFile :: ACorrId -> SndFileId -> ACommand 'Agent 'AESndFile -> CM () -processAgentMsgSndFile _corrId aFileId msg = - withStore' (`getUserByASndFileId` AgentSndFileId aFileId) >>= \case - Just user -> process user `catchChatError` (toView . CRChatError (Just user)) - _ -> do - lift $ withAgent' (`xftpDeleteSndFileInternal` aFileId) - throwChatError $ CENoSndFileUser $ AgentSndFileId aFileId +processAgentMsgSndFile _corrId aFileId msg = do + fileId <- withStore (`getXFTPSndFileDBId` AgentSndFileId aFileId) + withFileLock "processAgentMsgSndFile" fileId $ + withStore' (`getUserByASndFileId` AgentSndFileId aFileId) >>= \case + Just user -> process user fileId `catchChatError` (toView . CRChatError (Just user)) + _ -> do + lift $ withAgent' (`xftpDeleteSndFileInternal` aFileId) + throwChatError $ CENoSndFileUser $ AgentSndFileId aFileId where - process :: User -> CM () - process user = do - (ft@FileTransferMeta {fileId, xftpRedirectFor, cancelled}, sfts) <- withStore $ \db -> do - fileId <- getXFTPSndFileDBId db user $ AgentSndFileId aFileId - getSndFileTransfer db user fileId + process :: User -> FileTransferId -> CM () + process user fileId = do + (ft@FileTransferMeta {xftpRedirectFor, cancelled}, sfts) <- withStore $ \db -> getSndFileTransfer db user fileId vr <- chatVersionRange unless cancelled $ case msg of SFPROG sndProgress sndTotal -> do @@ -3386,11 +3429,11 @@ processAgentMsgSndFile _corrId aFileId msg = lift $ withAgent' (`xftpDeleteSndFileInternal` aFileId) withStore' $ \db -> createExtraSndFTDescrs db user fileId (map fileDescrText rfds) case rfds of - [] -> sendFileError "no receiver descriptions" fileId vr ft + [] -> sendFileError "no receiver descriptions" vr ft rfd : _ -> case [fd | fd@(FD.ValidFileDescription FD.FileDescription {chunks = [_]}) <- rfds] of [] -> case xftpRedirectFor of Nothing -> xftpSndFileRedirect user fileId rfd >>= toView . CRSndFileRedirectStartXFTP user ft - Just _ -> sendFileError "Prohibit chaining redirects" fileId vr ft + Just _ -> sendFileError "Prohibit chaining redirects" vr ft rfds' -> do -- we have 1 chunk - use it as URI whether it is redirect or not ft' <- maybe (pure ft) (\fId -> withStore $ \db -> getFileTransferMeta db user fId) xftpRedirectFor @@ -3439,7 +3482,7 @@ processAgentMsgSndFile _corrId aFileId msg = | temporaryAgentError e -> throwChatError $ CEXFTPSndFile fileId (AgentSndFileId aFileId) e | otherwise -> - sendFileError (tshow e) fileId vr ft + sendFileError (tshow e) vr ft where fileDescrText :: FilePartyI p => ValidFileDescription p -> T.Text fileDescrText = safeDecodeUtf8 . strEncode @@ -3457,8 +3500,8 @@ processAgentMsgSndFile _corrId aFileId msg = case L.nonEmpty fds of Just fds' -> loopSend fds' Nothing -> pure msgDeliveryId - sendFileError :: Text -> Int64 -> (PQSupport -> VersionRangeChat) -> FileTransferMeta -> CM () - sendFileError err fileId vr ft = do + sendFileError :: Text -> (PQSupport -> VersionRangeChat) -> FileTransferMeta -> CM () + sendFileError err vr ft = do logError $ "Sent file error: " <> err ci <- withStore $ \db -> do liftIO $ updateFileCancelled db user fileId CIFSSndError @@ -3480,18 +3523,18 @@ splitFileDescr rfdText = do else fileDescr <| splitParts (partNo + 1) partSize rest processAgentMsgRcvFile :: ACorrId -> RcvFileId -> ACommand 'Agent 'AERcvFile -> CM () -processAgentMsgRcvFile _corrId aFileId msg = - withStore' (`getUserByARcvFileId` AgentRcvFileId aFileId) >>= \case - Just user -> process user `catchChatError` (toView . CRChatError (Just user)) - _ -> do - lift $ withAgent' (`xftpDeleteRcvFile` aFileId) - throwChatError $ CENoRcvFileUser $ AgentRcvFileId aFileId +processAgentMsgRcvFile _corrId aFileId msg = do + fileId <- withStore (`getXFTPRcvFileDBId` AgentRcvFileId aFileId) + withFileLock "processAgentMsgRcvFile" fileId $ + withStore' (`getUserByARcvFileId` AgentRcvFileId aFileId) >>= \case + Just user -> process user fileId `catchChatError` (toView . CRChatError (Just user)) + _ -> do + lift $ withAgent' (`xftpDeleteRcvFile` aFileId) + throwChatError $ CENoRcvFileUser $ AgentRcvFileId aFileId where - process :: User -> CM () - process user = do - ft@RcvFileTransfer {fileId} <- withStore $ \db -> do - fileId <- getXFTPRcvFileDBId db $ AgentRcvFileId aFileId - getRcvFileTransfer db user fileId + process :: User -> FileTransferId -> CM () + process user fileId = do + ft <- withStore $ \db -> getRcvFileTransfer db user fileId vr <- chatVersionRange unless (rcvFileCompleteOrCancelled ft) $ case msg of RFPROG rcvProgress rcvTotal -> do @@ -3597,7 +3640,8 @@ processAgentMessageConn vr user@User {userId} corrId agentConnId agentMessage = -- probably this branch is never executed, so there should be no reason -- to save message if contact hasn't been created yet - chat item isn't created anyway withAckMessage' agentConnId meta $ - void $ saveDirectRcvMSG conn meta msgBody + void $ + saveDirectRcvMSG conn meta msgBody SENT msgId -> sentMsgDeliveryEvent conn msgId OK -> @@ -3634,7 +3678,6 @@ processAgentMessageConn vr user@User {userId} corrId agentConnId agentMessage = (conn'', msg@RcvMessage {chatMsgEvent = ACME _ event}) <- saveDirectRcvMSG conn' msgMeta msgBody let ct'' = ct' {activeConn = Just conn''} :: Contact assertDirectAllowed user MDRcv ct'' $ toCMEventTag event - updateChatLock "direct message" event case event of XMsgNew mc -> newContentMessage ct'' mc msg msgMeta XMsgFileDescr sharedMsgId fileDescr -> messageFileDescription ct'' sharedMsgId fileDescr @@ -4053,7 +4096,6 @@ processAgentMessageConn vr user@User {userId} corrId agentConnId agentMessage = processEvent :: MsgEncodingI e => ChatMessage e -> CM () processEvent chatMsg = do (m', conn', msg@RcvMessage {chatMsgEvent = ACME _ event}) <- saveGroupRcvMsg user groupId m conn msgMeta msgBody chatMsg - updateChatLock "groupMessage" event case event of XMsgNew mc -> memberCanSend m' $ newGroupContentMessage gInfo m' mc msg brokerTs False XMsgFileDescr sharedMsgId fileDescr -> memberCanSend m' $ groupMessageFileDescription gInfo m' sharedMsgId fileDescr @@ -4389,13 +4431,6 @@ processAgentMessageConn vr user@User {userId} corrId agentConnId agentMessage = toView $ CRConnectionDisabled connEntity _ -> pure () - updateChatLock :: MsgEncodingI enc => String -> ChatMsgEvent enc -> CM () - updateChatLock name event = do - l <- asks chatLock - atomically $ tryReadTMVar l >>= mapM_ (swapTMVar l . (<> s)) - where - s = " " <> name <> "=" <> B.unpack (strEncode $ toCMEventTag event) - -- TODO v5.7 / v6.0 - together with deprecating old group protocol establishing direct connections? -- we could save command records only for agent APIs we process continuations for (INV) withCompletedCommand :: forall e. AEntityI e => Connection -> ACommand 'Agent e -> (CommandData -> CM ()) -> CM () @@ -4433,9 +4468,9 @@ processAgentMessageConn vr user@User {userId} corrId agentConnId agentMessage = -- This prevents losing the message that failed to be processed. Left (ChatErrorStore SEDBBusyError {message}) | showCritical -> throwError $ ChatErrorAgent (CRITICAL True message) Nothing Left e -> ackMsg msgMeta Nothing >> throwError e - where - ackMsg :: MsgMeta -> Maybe MsgReceiptInfo -> CM () - ackMsg MsgMeta {recipient = (msgId, _)} rcpt = withAgent $ \a -> ackMessageAsync a "" cId msgId rcpt + where + ackMsg :: MsgMeta -> Maybe MsgReceiptInfo -> CM () + ackMsg MsgMeta {recipient = (msgId, _)} rcpt = withAgent $ \a -> ackMessageAsync a "" cId msgId rcpt sentMsgDeliveryEvent :: Connection -> AgentMsgId -> CM () sentMsgDeliveryEvent Connection {connId} msgId = diff --git a/src/Simplex/Chat/Controller.hs b/src/Simplex/Chat/Controller.hs index 0291793843..024757e7bb 100644 --- a/src/Simplex/Chat/Controller.hs +++ b/src/Simplex/Chat/Controller.hs @@ -59,7 +59,7 @@ import Simplex.Chat.Messages.CIContent import Simplex.Chat.Protocol import Simplex.Chat.Remote.AppVersion import Simplex.Chat.Remote.Types -import Simplex.Chat.Store (AutoAccept, StoreError (..), UserContactLink, UserMsgReceiptSettings) +import Simplex.Chat.Store (AutoAccept, ChatLockEntity, StoreError (..), UserContactLink, UserMsgReceiptSettings) import Simplex.Chat.Types import Simplex.Chat.Types.Preferences import Simplex.Chat.Util (liftIOEither) @@ -165,7 +165,7 @@ defaultChatHooks = ChatHooks { preCmdHook = \_ -> pure . Right, eventHook = \_ -> pure - } + } data DefaultAgentServers = DefaultAgentServers { smp :: NonEmpty SMPServerWithAuth, @@ -208,6 +208,7 @@ data ChatController = ChatController connNetworkStatuses :: TMap AgentConnId NetworkStatus, subscriptionMode :: TVar SubscriptionMode, chatLock :: Lock, + entityLocks :: TMap ChatLockEntity Lock, sndFiles :: TVar (Map Int64 Handle), rcvFiles :: TVar (Map Int64 Handle), currentCalls :: TMap ContactId Call, @@ -491,9 +492,9 @@ data ChatCommand | GetAgentSubsDetails | GetAgentWorkers | GetAgentWorkersDetails - -- The parser will return this command for strings that start from "//". - -- This command should be processed in preCmdHook - | CustomChatCommand ByteString + | -- The parser will return this command for strings that start from "//". + -- This command should be processed in preCmdHook + CustomChatCommand ByteString deriving (Show) allowRemoteCommand :: ChatCommand -> Bool -- XXX: consider using Relay/Block/ForceLocal @@ -731,7 +732,7 @@ data ChatResponse | CRContactPQEnabled {user :: User, contact :: Contact, pqEnabled :: PQEncryption} | CRSQLResult {rows :: [Text]} | CRSlowSQLQueries {chatQueries :: [SlowSQLQuery], agentQueries :: [SlowSQLQuery]} - | CRDebugLocks {chatLockName :: Maybe String, agentLocks :: AgentLocks} + | CRDebugLocks {chatLockName :: Maybe String, chatEntityLocks :: Map String String, agentLocks :: AgentLocks} | CRAgentStats {agentStats :: [[String]]} | CRAgentWorkersDetails {agentWorkersDetails :: AgentWorkersDetails} | CRAgentWorkersSummary {agentWorkersSummary :: AgentWorkersSummary} @@ -1353,7 +1354,7 @@ handleDBErrors = [ E.Handler $ \(e :: SQLError) -> let se = SQL.sqlError e busy = se == SQL.ErrorBusy || se == SQL.ErrorLocked - in pure . Left . ChatErrorStore $ if busy then SEDBBusyError $ show se else SEDBException $ show e, + in pure . Left . ChatErrorStore $ if busy then SEDBBusyError $ show se else SEDBException $ show e, E.Handler $ \(E.SomeException e) -> pure . Left . ChatErrorStore . SEDBException $ show e ] diff --git a/src/Simplex/Chat/Protocol.hs b/src/Simplex/Chat/Protocol.hs index e2810dafa9..8727a592a7 100644 --- a/src/Simplex/Chat/Protocol.hs +++ b/src/Simplex/Chat/Protocol.hs @@ -48,7 +48,7 @@ import Simplex.Chat.Types import Simplex.Chat.Types.Util import Simplex.Messaging.Agent.Protocol (VersionSMPA, pqdrSMPAgentVersion) import Simplex.Messaging.Compression (compress1, decompressBatch) -import Simplex.Messaging.Crypto.Ratchet (PQSupport (..), pattern PQSupportOn, pattern PQSupportOff) +import Simplex.Messaging.Crypto.Ratchet (PQSupport (..), pattern PQSupportOff, pattern PQSupportOn) import Simplex.Messaging.Encoding import Simplex.Messaging.Encoding.String import Simplex.Messaging.Parsers (defaultJSON, dropPrefix, fromTextField_, fstToLower, parseAll, sumTypeJSON, taggedObjectJSON) diff --git a/src/Simplex/Chat/Store.hs b/src/Simplex/Chat/Store.hs index 91021713b1..4b0591fb3a 100644 --- a/src/Simplex/Chat/Store.hs +++ b/src/Simplex/Chat/Store.hs @@ -1,6 +1,7 @@ module Simplex.Chat.Store ( SQLiteStore, StoreError (..), + ChatLockEntity (..), UserMsgReceiptSettings (..), UserContactLink (..), AutoAccept (..), diff --git a/src/Simplex/Chat/Store/Connections.hs b/src/Simplex/Chat/Store/Connections.hs index 6584aabb0a..0e543eacf2 100644 --- a/src/Simplex/Chat/Store/Connections.hs +++ b/src/Simplex/Chat/Store/Connections.hs @@ -3,11 +3,13 @@ {-# LANGUAGE NamedFieldPuns #-} {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE QuasiQuotes #-} +{-# LANGUAGE TupleSections #-} {-# LANGUAGE TypeOperators #-} {-# OPTIONS_GHC -fno-warn-ambiguous-fields #-} module Simplex.Chat.Store.Connections - ( getConnectionEntity, + ( getChatLockEntity, + getConnectionEntity, getConnectionEntityByConnReq, getContactConnEntityByConnReqHash, getConnectionsToSubscribe, @@ -37,6 +39,31 @@ import qualified Simplex.Messaging.Agent.Store.SQLite.DB as DB import Simplex.Messaging.Crypto.Ratchet (PQSupport) import Simplex.Messaging.Util (eitherToMaybe) +getChatLockEntity :: DB.Connection -> AgentConnId -> ExceptT StoreError IO ChatLockEntity +getChatLockEntity db agentConnId = do + ((connId, connType) :. (contactId, groupMemberId, sndFileId, rcvFileId, userContactLinkId)) <- + ExceptT . firstRow id (SEConnectionNotFound agentConnId) $ + DB.query + db + [sql| + SELECT connection_id, conn_type, contact_id, group_member_id, snd_file_id, rcv_file_id, user_contact_link_id + FROM connections + WHERE agent_conn_id = ? + |] + (Only agentConnId) + let err = throwError $ SEInternalError $ "connection " <> show connType <> " without entity" + case connType of + ConnMember -> maybe err (fmap CLGroup . getMemberGroupId) groupMemberId + ConnContact -> pure $ maybe (CLConnection connId) CLContact contactId + ConnSndFile -> maybe err (pure . CLFile) sndFileId + ConnRcvFile -> maybe err (pure . CLFile) rcvFileId + ConnUserContact -> maybe err (pure . CLUserContact) userContactLinkId + where + getMemberGroupId :: GroupMemberId -> ExceptT StoreError IO GroupId + getMemberGroupId groupMemberId = + ExceptT . firstRow fromOnly (SEInternalError "group member connection group_id not found") $ + DB.query db "SELECT group_id FROM group_members WHERE group_member_id = ?" (Only groupMemberId) + getConnectionEntity :: DB.Connection -> (PQSupport -> VersionRangeChat) -> User -> AgentConnId -> ExceptT StoreError IO ConnectionEntity getConnectionEntity db vr user@User {userId, userContactId} agentConnId = do c@Connection {connType, entityId} <- getConnection_ diff --git a/src/Simplex/Chat/Store/Files.hs b/src/Simplex/Chat/Store/Files.hs index e77681bb9b..8ac54c7e9b 100644 --- a/src/Simplex/Chat/Store/Files.hs +++ b/src/Simplex/Chat/Store/Files.hs @@ -336,10 +336,10 @@ setSndFTAgentDeleted db User {userId} fileId = do "UPDATE files SET agent_snd_file_deleted = 1, updated_at = ? WHERE user_id = ? AND file_id = ?" (currentTs, userId, fileId) -getXFTPSndFileDBId :: DB.Connection -> User -> AgentSndFileId -> ExceptT StoreError IO FileTransferId -getXFTPSndFileDBId db User {userId} aSndFileId = +getXFTPSndFileDBId :: DB.Connection -> AgentSndFileId -> ExceptT StoreError IO FileTransferId +getXFTPSndFileDBId db aSndFileId = ExceptT . firstRow fromOnly (SESndFileNotFoundXFTP aSndFileId) $ - DB.query db "SELECT file_id FROM files WHERE user_id = ? AND agent_snd_file_id = ?" (userId, aSndFileId) + DB.query db "SELECT file_id FROM files WHERE agent_snd_file_id = ?" (Only aSndFileId) getXFTPRcvFileDBId :: DB.Connection -> AgentRcvFileId -> ExceptT StoreError IO FileTransferId getXFTPRcvFileDBId db aRcvFileId = diff --git a/src/Simplex/Chat/Store/Shared.hs b/src/Simplex/Chat/Store/Shared.hs index 88540134fe..ef7cda4802 100644 --- a/src/Simplex/Chat/Store/Shared.hs +++ b/src/Simplex/Chat/Store/Shared.hs @@ -45,6 +45,15 @@ import Simplex.Messaging.Util (allFinally) import Simplex.Messaging.Version import UnliftIO.STM +data ChatLockEntity + = CLInvitation ByteString + | CLConnection Int64 + | CLContact ContactId + | CLGroup GroupId + | CLUserContact Int64 + | CLFile Int64 + deriving (Eq, Ord) + -- These error type constructors must be added to mobile apps data StoreError = SEDuplicateName diff --git a/src/Simplex/Chat/View.hs b/src/Simplex/Chat/View.hs index 65a6626308..550fe97b18 100644 --- a/src/Simplex/Chat/View.hs +++ b/src/Simplex/Chat/View.hs @@ -351,8 +351,9 @@ responseToView hu@(currentRH, user_) ChatConfig {logLevel, showReactions, showRe <> (" :: avg: " <> sShow timeAvg <> " ms") <> (" :: " <> plain (T.unwords $ T.lines query)) in ("Chat queries" : map viewQuery chatQueries) <> [""] <> ("Agent queries" : map viewQuery agentQueries) - CRDebugLocks {chatLockName, agentLocks} -> + CRDebugLocks {chatLockName, chatEntityLocks, agentLocks} -> [ maybe "no chat lock" (("chat lock: " <>) . plain) chatLockName, + plain $ "chat entity locks: " <> LB.unpack (J.encode chatEntityLocks), plain $ "agent locks: " <> LB.unpack (J.encode agentLocks) ] CRAgentStats stats -> map (plain . intercalate ",") stats @@ -1595,7 +1596,7 @@ standaloneUploadComplete FileTransferMeta {fileId, fileName} = \case [] -> [fileTransferStr fileId fileName <> " upload complete."] uris -> fileTransferStr fileId fileName <> " upload complete. download with:" - : map plain uris + : map plain uris sndFile :: SndFileTransfer -> StyledString sndFile SndFileTransfer {fileId, fileName} = fileTransferStr fileId fileName diff --git a/tests/ChatTests/Files.hs b/tests/ChatTests/Files.hs index 1e72df9156..e50b20844a 100644 --- a/tests/ChatTests/Files.hs +++ b/tests/ChatTests/Files.hs @@ -787,7 +787,8 @@ testXFTPCancelRcvRepeat = bob ##> "/fr 1 ./tests/tmp" bob <### [ "saving file 1 from alice to ./tests/tmp/testfile_1", - "started receiving file 1 (testfile) from alice" + "started receiving file 1 (testfile) from alice", + StartsWith "chat db error: SERcvFileNotFoundXFTP" ] bob <## "completed receiving file 1 (testfile) from alice" diff --git a/tests/ChatTests/Profiles.hs b/tests/ChatTests/Profiles.hs index 7996fde3ad..8a9191c988 100644 --- a/tests/ChatTests/Profiles.hs +++ b/tests/ChatTests/Profiles.hs @@ -179,10 +179,12 @@ testMultiWordProfileNames = alice <# "#'Our Team' 'Bob James'> hi" cath <# "#'Our Team' 'Bob James'> hi" alice `send` "@'Cath Johnson' hello" - alice <## "member #'Our Team' 'Cath Johnson' does not have direct connection, creating" - alice <## "contact for member #'Our Team' 'Cath Johnson' is created" - alice <## "sent invitation to connect directly to member #'Our Team' 'Cath Johnson'" - alice <# "@'Cath Johnson' hello" + alice + <### [ "member #'Our Team' 'Cath Johnson' does not have direct connection, creating", + "contact for member #'Our Team' 'Cath Johnson' is created", + "sent invitation to connect directly to member #'Our Team' 'Cath Johnson'", + WithTime "@'Cath Johnson' hello" + ] cath <## "#'Our Team' 'Alice Jones' is creating direct contact 'Alice Jones' with you" cath <# "'Alice Jones'> hello" cath <## "'Alice Jones': contact is connected"