core: entity locks (#3962)

* core: entity locks

* more locks

* update sha256map

* add delay

* clean up

* empty

* fix tests

* empty

* empty

* more delays

* empty

* comment delays

* Revert "comment delays"

This reverts commit 4245b545fb.

* Revert "Revert "comment delays""

This reverts commit f803386945.

* take lock in the beginning of processing loop

* empty

* empty

* remove lock

* rework file locks

* empty

* fix

* empty

* add connection locks

* empty

* fix test

* empty

* remove commented delays

* add to debug locks

* update

* refactor

* refactor

---------

Co-authored-by: Evgeny Poberezkin <evgeny@poberezkin.com>
This commit is contained in:
spaced4ndy
2024-04-04 22:24:42 +04:00
committed by GitHub
parent a725d2efac
commit 069395c2a0
12 changed files with 255 additions and 178 deletions
+1 -1
View File
@@ -12,7 +12,7 @@ constraints: zip +disable-bzip2 +disable-zstd
source-repository-package
type: git
location: https://github.com/simplex-chat/simplexmq.git
tag: 6bc4f6c94e11f59604b0d9c576e62e01bc08b4cd
tag: 791368c7beb3996ab4a10f25dbf8cad1e289b413
source-repository-package
type: git
+1 -1
View File
@@ -1,5 +1,5 @@
{
"https://github.com/simplex-chat/simplexmq.git"."6bc4f6c94e11f59604b0d9c576e62e01bc08b4cd" = "08l00ay1ibz7skhlpfjp6z2821zpfd0kxplwdm6zc63m2f6za7cv";
"https://github.com/simplex-chat/simplexmq.git"."791368c7beb3996ab4a10f25dbf8cad1e289b413" = "0wbxk69lv6h17b5rdqskxwhc1wfvn1zi8q4a4w57qfzkzyaxkymk";
"https://github.com/simplex-chat/hs-socks.git"."a30cc7a79a08d8108316094f8f2f82a0c5e1ac51" = "0yasvnr7g91k76mjkamvzab2kvlb1g5pspjyjn2fr6v83swjhj38";
"https://github.com/simplex-chat/direct-sqlcipher.git"."f814ee68b16a9447fbb467ccc8f29bdd3546bfd9" = "1ql13f4kfwkbaq7nygkxgw84213i0zm7c1a8hwvramayxl38dq5d";
"https://github.com/simplex-chat/sqlcipher-simple.git"."a46bd361a19376c5211f1058908fc0ae6bf42446" = "1z0r78d8f0812kxbgsm735qf6xx8lvaz27k1a0b4a2m0sshpd5gl";
+192 -157
View File
@@ -88,9 +88,9 @@ import Simplex.FileTransfer.Description (FileDescriptionURI (..), ValidFileDescr
import qualified Simplex.FileTransfer.Description as FD
import Simplex.FileTransfer.Protocol (FileParty (..), FilePartyI)
import Simplex.Messaging.Agent as Agent
import Simplex.Messaging.Agent.Client (AgentStatsKey (..), SubInfo (..), agentClientStore, getAgentWorkersDetails, getAgentWorkersSummary, temporaryAgentError)
import Simplex.Messaging.Agent.Client (AgentStatsKey (..), SubInfo (..), agentClientStore, getAgentWorkersDetails, getAgentWorkersSummary, temporaryAgentError, withLockMap)
import Simplex.Messaging.Agent.Env.SQLite (AgentConfig (..), InitialAgentServers (..), createAgentStore, defaultAgentConfig)
import Simplex.Messaging.Agent.Lock
import Simplex.Messaging.Agent.Lock (withLock)
import Simplex.Messaging.Agent.Protocol
import qualified Simplex.Messaging.Agent.Protocol as AP (AgentErrorType (..))
import Simplex.Messaging.Agent.Store.SQLite (MigrationConfirmation (..), MigrationError, SQLiteStore (dbNew), execSQL, upMigration, withConnection)
@@ -227,6 +227,7 @@ newChatController
connNetworkStatuses <- atomically TM.empty
subscriptionMode <- newTVarIO SMSubscribe
chatLock <- newEmptyTMVarIO
entityLocks <- atomically TM.empty
sndFiles <- newTVarIO M.empty
rcvFiles <- newTVarIO M.empty
currentCalls <- atomically TM.empty
@@ -263,6 +264,7 @@ newChatController
connNetworkStatuses,
subscriptionMode,
chatLock,
entityLocks,
sndFiles,
rcvFiles,
currentCalls,
@@ -310,6 +312,40 @@ newChatController
userServers :: User -> IO (NonEmpty (ProtoServerWithAuth p))
userServers user' = activeAgentServers config protocol <$> withTransaction chatStore (`getProtocolServers` user')
withChatLock :: String -> CM a -> CM a
withChatLock name action = asks chatLock >>= \l -> withLock l name action
withEntityLock :: String -> ChatLockEntity -> CM a -> CM a
withEntityLock name entity action = do
chatLock <- asks chatLock
ls <- asks entityLocks
atomically $ unlessM (isEmptyTMVar chatLock) retry
withLockMap ls entity name action
withInvitationLock :: String -> ByteString -> CM a -> CM a
withInvitationLock name = withEntityLock name . CLInvitation
{-# INLINE withInvitationLock #-}
withConnectionLock :: String -> Int64 -> CM a -> CM a
withConnectionLock name = withEntityLock name . CLConnection
{-# INLINE withConnectionLock #-}
withContactLock :: String -> ContactId -> CM a -> CM a
withContactLock name = withEntityLock name . CLContact
{-# INLINE withContactLock #-}
withGroupLock :: String -> GroupId -> CM a -> CM a
withGroupLock name = withEntityLock name . CLGroup
{-# INLINE withGroupLock #-}
withUserContactLock :: String -> Int64 -> CM a -> CM a
withUserContactLock name = withEntityLock name . CLUserContact
{-# INLINE withUserContactLock #-}
withFileLock :: String -> Int64 -> CM a -> CM a
withFileLock name = withEntityLock name . CLFile
{-# INLINE withFileLock #-}
activeAgentServers :: UserProtocol p => ChatConfig -> SProtocolType p -> [ServerCfg p] -> NonEmpty (ProtoServerWithAuth p)
activeAgentServers ChatConfig {defaultServers} p =
fromMaybe (cfgServers p defaultServers)
@@ -669,8 +705,8 @@ processChatCommand' vr = \case
memStatuses -> pure $ Just $ map (uncurry MemberDeliveryStatus) memStatuses
_ -> pure Nothing
pure $ CRChatItemInfo user aci ChatItemInfo {itemVersions, memberDeliveryStatuses}
APISendMessage (ChatRef cType chatId) live itemTTL (ComposedMessage file_ quotedItemId_ mc) -> withUser $ \user -> withChatLock "sendMessage" $ case cType of
CTDirect -> do
APISendMessage (ChatRef cType chatId) live itemTTL (ComposedMessage file_ quotedItemId_ mc) -> withUser $ \user -> case cType of
CTDirect -> withContactLock "sendMessage" chatId $ do
ct@Contact {contactId, contactUsed} <- withStore $ \db -> getContact db vr user chatId
assertDirectAllowed user MDSnd ct XMsgNew_
unless contactUsed $ withStore' $ \db -> updateContactUsed db user ct
@@ -707,7 +743,7 @@ processChatCommand' vr = \case
quoteData ChatItem {content = CISndMsgContent qmc} = pure (qmc, CIQDirectSnd, True)
quoteData ChatItem {content = CIRcvMsgContent qmc} = pure (qmc, CIQDirectRcv, False)
quoteData _ = throwChatError CEInvalidQuote
CTGroup -> do
CTGroup -> withGroupLock "sendMessage" chatId $ do
g@(Group gInfo _) <- withStore $ \db -> getGroup db vr user chatId
assertUserGroupRole gInfo GRAuthor
send g
@@ -767,8 +803,8 @@ processChatCommand' vr = \case
pure CIFile {fileId, fileName = takeFileName filePath, fileSize, fileSource = Just cf, fileStatus = CIFSSndStored, fileProtocol = FPLocal}
let ci = mkChatItem cd ciId content ciFile_ Nothing Nothing Nothing False createdAt Nothing createdAt
pure . CRNewChatItem user $ AChatItem SCTLocal SMDSnd (LocalChat nf) ci
APIUpdateChatItem (ChatRef cType chatId) itemId live mc -> withUser $ \user -> withChatLock "updateChatItem" $ case cType of
CTDirect -> do
APIUpdateChatItem (ChatRef cType chatId) itemId live mc -> withUser $ \user -> case cType of
CTDirect -> withContactLock "updateChatItem" chatId $ do
ct@Contact {contactId} <- withStore $ \db -> getContact db vr user chatId
assertDirectAllowed user MDSnd ct XMsgUpdate_
cci <- withStore $ \db -> getDirectCIWithReactions db user ct itemId
@@ -790,7 +826,7 @@ processChatCommand' vr = \case
else pure $ CRChatItemNotChanged user (AChatItem SCTDirect SMDSnd (DirectChat ct) ci)
_ -> throwChatError CEInvalidChatItemUpdate
CChatItem SMDRcv _ -> throwChatError CEInvalidChatItemUpdate
CTGroup -> do
CTGroup -> withGroupLock "updateChatItem" chatId $ do
Group gInfo@GroupInfo {groupId} ms <- withStore $ \db -> getGroup db vr user chatId
assertUserGroupRole gInfo GRAuthor
cci <- withStore $ \db -> getGroupCIWithReactions db user gInfo itemId
@@ -825,8 +861,8 @@ processChatCommand' vr = \case
_ -> throwChatError CEInvalidChatItemUpdate
CTContactRequest -> pure $ chatCmdError (Just user) "not supported"
CTContactConnection -> pure $ chatCmdError (Just user) "not supported"
APIDeleteChatItem (ChatRef cType chatId) itemId mode -> withUser $ \user -> withChatLock "deleteChatItem" $ case cType of
CTDirect -> do
APIDeleteChatItem (ChatRef cType chatId) itemId mode -> withUser $ \user -> case cType of
CTDirect -> withContactLock "deleteChatItem" chatId $ do
(ct, CChatItem msgDir ci@ChatItem {meta = CIMeta {itemSharedMsgId, editable}}) <- withStore $ \db -> (,) <$> getContact db vr user chatId <*> getDirectChatItem db user chatId itemId
case (mode, msgDir, itemSharedMsgId, editable) of
(CIDMInternal, _, _, _) -> deleteDirectCI user ct ci True False
@@ -837,7 +873,7 @@ processChatCommand' vr = \case
then deleteDirectCI user ct ci True False
else markDirectCIDeleted user ct ci msgId True =<< liftIO getCurrentTime
(CIDMBroadcast, _, _, _) -> throwChatError CEInvalidChatItemDelete
CTGroup -> do
CTGroup -> withGroupLock "deleteChatItem" chatId $ do
Group gInfo ms <- withStore $ \db -> getGroup db vr user chatId
CChatItem msgDir ci@ChatItem {meta = CIMeta {itemSharedMsgId, editable}} <- withStore $ \db -> getGroupChatItem db user chatId itemId
case (mode, msgDir, itemSharedMsgId, editable) of
@@ -852,7 +888,7 @@ processChatCommand' vr = \case
deleteLocalCI user nf ci True False
CTContactRequest -> pure $ chatCmdError (Just user) "not supported"
CTContactConnection -> pure $ chatCmdError (Just user) "not supported"
APIDeleteMemberChatItem gId mId itemId -> withUser $ \user -> withChatLock "deleteChatItem" $ do
APIDeleteMemberChatItem gId mId itemId -> withUser $ \user -> withGroupLock "deleteChatItem" gId $ do
Group gInfo@GroupInfo {membership} ms <- withStore $ \db -> getGroup db vr user gId
CChatItem _ ci@ChatItem {chatDir, meta = CIMeta {itemSharedMsgId}} <- withStore $ \db -> getGroupChatItem db user gId itemId
case (chatDir, itemSharedMsgId) of
@@ -862,44 +898,46 @@ processChatCommand' vr = \case
(SndMessage {msgId}, _) <- sendGroupMessage user gInfo ms $ XMsgDel itemSharedMId $ Just memberId
delGroupChatItem user gInfo ci msgId (Just membership)
(_, _) -> throwChatError CEInvalidChatItemDelete
APIChatItemReaction (ChatRef cType chatId) itemId add reaction -> withUser $ \user -> withChatLock "chatItemReaction" $ case cType of
APIChatItemReaction (ChatRef cType chatId) itemId add reaction -> withUser $ \user -> case cType of
CTDirect ->
withStore (\db -> (,) <$> getContact db vr user chatId <*> getDirectChatItem db user chatId itemId) >>= \case
(ct, CChatItem md ci@ChatItem {meta = CIMeta {itemSharedMsgId = Just itemSharedMId}}) -> do
unless (featureAllowed SCFReactions forUser ct) $
throwChatError (CECommandError $ "feature not allowed " <> T.unpack (chatFeatureNameText CFReactions))
unless (ciReactionAllowed ci) $
throwChatError (CECommandError "reaction not allowed - chat item has no content")
rs <- withStore' $ \db -> getDirectReactions db ct itemSharedMId True
checkReactionAllowed rs
(SndMessage {msgId}, _) <- sendDirectContactMessage user ct $ XMsgReact itemSharedMId Nothing reaction add
createdAt <- liftIO getCurrentTime
reactions <- withStore' $ \db -> do
setDirectReaction db ct itemSharedMId True reaction add msgId createdAt
liftIO $ getDirectCIReactions db ct itemSharedMId
let ci' = CChatItem md ci {reactions}
r = ACIReaction SCTDirect SMDSnd (DirectChat ct) $ CIReaction CIDirectSnd ci' createdAt reaction
pure $ CRChatItemReaction user add r
_ -> throwChatError $ CECommandError "reaction not possible - no shared item ID"
withContactLock "chatItemReaction" chatId $
withStore (\db -> (,) <$> getContact db vr user chatId <*> getDirectChatItem db user chatId itemId) >>= \case
(ct, CChatItem md ci@ChatItem {meta = CIMeta {itemSharedMsgId = Just itemSharedMId}}) -> do
unless (featureAllowed SCFReactions forUser ct) $
throwChatError (CECommandError $ "feature not allowed " <> T.unpack (chatFeatureNameText CFReactions))
unless (ciReactionAllowed ci) $
throwChatError (CECommandError "reaction not allowed - chat item has no content")
rs <- withStore' $ \db -> getDirectReactions db ct itemSharedMId True
checkReactionAllowed rs
(SndMessage {msgId}, _) <- sendDirectContactMessage user ct $ XMsgReact itemSharedMId Nothing reaction add
createdAt <- liftIO getCurrentTime
reactions <- withStore' $ \db -> do
setDirectReaction db ct itemSharedMId True reaction add msgId createdAt
liftIO $ getDirectCIReactions db ct itemSharedMId
let ci' = CChatItem md ci {reactions}
r = ACIReaction SCTDirect SMDSnd (DirectChat ct) $ CIReaction CIDirectSnd ci' createdAt reaction
pure $ CRChatItemReaction user add r
_ -> throwChatError $ CECommandError "reaction not possible - no shared item ID"
CTGroup ->
withStore (\db -> (,) <$> getGroup db vr user chatId <*> getGroupChatItem db user chatId itemId) >>= \case
(Group g@GroupInfo {membership} ms, CChatItem md ci@ChatItem {meta = CIMeta {itemSharedMsgId = Just itemSharedMId}}) -> do
unless (groupFeatureAllowed SGFReactions g) $
throwChatError (CECommandError $ "feature not allowed " <> T.unpack (chatFeatureNameText CFReactions))
unless (ciReactionAllowed ci) $
throwChatError (CECommandError "reaction not allowed - chat item has no content")
let GroupMember {memberId = itemMemberId} = chatItemMember g ci
rs <- withStore' $ \db -> getGroupReactions db g membership itemMemberId itemSharedMId True
checkReactionAllowed rs
(SndMessage {msgId}, _) <- sendGroupMessage user g ms (XMsgReact itemSharedMId (Just itemMemberId) reaction add)
createdAt <- liftIO getCurrentTime
reactions <- withStore' $ \db -> do
setGroupReaction db g membership itemMemberId itemSharedMId True reaction add msgId createdAt
liftIO $ getGroupCIReactions db g itemMemberId itemSharedMId
let ci' = CChatItem md ci {reactions}
r = ACIReaction SCTGroup SMDSnd (GroupChat g) $ CIReaction CIGroupSnd ci' createdAt reaction
pure $ CRChatItemReaction user add r
_ -> throwChatError $ CECommandError "reaction not possible - no shared item ID"
withGroupLock "chatItemReaction" chatId $
withStore (\db -> (,) <$> getGroup db vr user chatId <*> getGroupChatItem db user chatId itemId) >>= \case
(Group g@GroupInfo {membership} ms, CChatItem md ci@ChatItem {meta = CIMeta {itemSharedMsgId = Just itemSharedMId}}) -> do
unless (groupFeatureAllowed SGFReactions g) $
throwChatError (CECommandError $ "feature not allowed " <> T.unpack (chatFeatureNameText CFReactions))
unless (ciReactionAllowed ci) $
throwChatError (CECommandError "reaction not allowed - chat item has no content")
let GroupMember {memberId = itemMemberId} = chatItemMember g ci
rs <- withStore' $ \db -> getGroupReactions db g membership itemMemberId itemSharedMId True
checkReactionAllowed rs
(SndMessage {msgId}, _) <- sendGroupMessage user g ms (XMsgReact itemSharedMId (Just itemMemberId) reaction add)
createdAt <- liftIO getCurrentTime
reactions <- withStore' $ \db -> do
setGroupReaction db g membership itemMemberId itemSharedMId True reaction add msgId createdAt
liftIO $ getGroupCIReactions db g itemMemberId itemSharedMId
let ci' = CChatItem md ci {reactions}
r = ACIReaction SCTGroup SMDSnd (GroupChat g) $ CIReaction CIGroupSnd ci' createdAt reaction
pure $ CRChatItemReaction user add r
_ -> throwChatError $ CECommandError "reaction not possible - no shared item ID"
CTLocal -> pure $ chatCmdError (Just user) "not supported"
CTContactRequest -> pure $ chatCmdError (Just user) "not supported"
CTContactConnection -> pure $ chatCmdError (Just user) "not supported"
@@ -959,7 +997,7 @@ processChatCommand' vr = \case
CTDirect -> do
ct <- withStore $ \db -> getContact db vr user chatId
filesInfo <- withStore' $ \db -> getContactFileInfo db user ct
withChatLock "deleteChat direct" . procCmd $ do
withContactLock "deleteChat direct" chatId . procCmd $ do
cancelFilesInProgress user filesInfo
deleteFilesLocally filesInfo
let doSendDel = contactReady ct && contactActive ct && notify
@@ -971,7 +1009,7 @@ processChatCommand' vr = \case
withStore' $ \db -> deleteContactConnectionsAndFiles db userId ct
withStore $ \db -> deleteContact db user ct
pure $ CRContactDeleted user ct
CTContactConnection -> withChatLock "deleteChat contactConnection" . procCmd $ do
CTContactConnection -> withConnectionLock "deleteChat contactConnection" chatId . procCmd $ do
conn@PendingContactConnection {pccAgentConnId = AgentConnId acId} <- withStore $ \db -> getPendingContactConnection db userId chatId
deleteAgentConnectionAsync user acId
withStore' $ \db -> deletePendingContactConnection db userId chatId
@@ -983,7 +1021,7 @@ processChatCommand' vr = \case
canDelete = isOwner || not (memberCurrent membership)
unless canDelete $ throwChatError $ CEGroupUserRole gInfo GROwner
filesInfo <- withStore' $ \db -> getGroupFileInfo db user gInfo
withChatLock "deleteChat group" . procCmd $ do
withGroupLock "deleteChat group" chatId . procCmd $ do
cancelFilesInProgress user filesInfo
deleteFilesLocally filesInfo
let doSendDel = memberActive membership && isOwner
@@ -1038,28 +1076,29 @@ processChatCommand' vr = \case
CTLocal -> do
nf <- withStore $ \db -> getNoteFolder db user chatId
filesInfo <- withStore' $ \db -> getNoteFolderFileInfo db user nf
withChatLock "clearChat local" . procCmd $ do
deleteFilesLocally filesInfo
withStore' $ \db -> deleteNoteFolderFiles db userId nf
withStore' $ \db -> deleteNoteFolderCIs db user nf
pure $ CRChatCleared user (AChatInfo SCTLocal $ LocalChat nf)
deleteFilesLocally filesInfo
withStore' $ \db -> deleteNoteFolderFiles db userId nf
withStore' $ \db -> deleteNoteFolderCIs db user nf
pure $ CRChatCleared user (AChatInfo SCTLocal $ LocalChat nf)
CTContactConnection -> pure $ chatCmdError (Just user) "not supported"
CTContactRequest -> pure $ chatCmdError (Just user) "not supported"
APIAcceptContact incognito connReqId -> withUser $ \_ -> withChatLock "acceptContact" $ do
APIAcceptContact incognito connReqId -> withUser $ \_ -> do
(user@User {userId}, cReq@UserContactRequest {userContactLinkId}) <- withStore $ \db -> getContactRequest' db connReqId
ucl <- withStore $ \db -> getUserContactLinkById db userId userContactLinkId
let contactUsed = (\(_, groupId_, _) -> isNothing groupId_) ucl
-- [incognito] generate profile to send, create connection with incognito profile
incognitoProfile <- if incognito then Just . NewIncognito <$> liftIO generateRandomProfile else pure Nothing
ct <- acceptContactRequest user cReq incognitoProfile contactUsed
pure $ CRAcceptingContactRequest user ct
APIRejectContact connReqId -> withUser $ \user -> withChatLock "rejectContact" $ do
cReq@UserContactRequest {agentContactConnId = AgentConnId connId, agentInvitationId = AgentInvId invId} <-
withUserContactLock "acceptContact" userContactLinkId $ do
ucl <- withStore $ \db -> getUserContactLinkById db userId userContactLinkId
let contactUsed = (\(_, groupId_, _) -> isNothing groupId_) ucl
-- [incognito] generate profile to send, create connection with incognito profile
incognitoProfile <- if incognito then Just . NewIncognito <$> liftIO generateRandomProfile else pure Nothing
ct <- acceptContactRequest user cReq incognitoProfile contactUsed
pure $ CRAcceptingContactRequest user ct
APIRejectContact connReqId -> withUser $ \user -> do
cReq@UserContactRequest {userContactLinkId, agentContactConnId = AgentConnId connId, agentInvitationId = AgentInvId invId} <-
withStore $ \db ->
getContactRequest db user connReqId
`storeFinally` liftIO (deleteContactRequest db user connReqId)
withAgent $ \a -> rejectContact a connId invId
pure $ CRContactRequestRejected user cReq
withUserContactLock "rejectContact" userContactLinkId $ do
withAgent $ \a -> rejectContact a connId invId
pure $ CRContactRequestRejected user cReq
APISendCallInvitation contactId callType -> withUser $ \user -> do
-- party initiating call
ct <- withStore $ \db -> getContact db vr user contactId
@@ -1067,7 +1106,7 @@ processChatCommand' vr = \case
if featureAllowed SCFCalls forUser ct
then do
calls <- asks currentCalls
withChatLock "sendCallInvitation" $ do
withContactLock "sendCallInvitation" contactId $ do
g <- asks random
callId <- atomically $ CallId <$> C.randomBytes 16 g
dhKeyPair <- atomically $ if encryptedCall callType then Just <$> C.generateKeyPair g else pure Nothing
@@ -1192,12 +1231,11 @@ processChatCommand' vr = \case
toServerCfg server = ServerCfg {server, preset = True, tested = Nothing, enabled = True}
GetUserProtoServers aProtocol -> withUser $ \User {userId} ->
processChatCommand $ APIGetUserProtoServers userId aProtocol
APISetUserProtoServers userId (APSC p (ProtoServersConfig servers)) -> withUserId userId $ \user -> withServerProtocol p $
withChatLock "setUserSMPServers" $ do
withStore $ \db -> overwriteProtocolServers db user servers
cfg <- asks config
lift $ withAgent' $ \a -> setProtocolServers a (aUserId user) $ activeAgentServers cfg p servers
ok user
APISetUserProtoServers userId (APSC p (ProtoServersConfig servers)) -> withUserId userId $ \user -> withServerProtocol p $ do
withStore $ \db -> overwriteProtocolServers db user servers
cfg <- asks config
lift $ withAgent' $ \a -> setProtocolServers a (aUserId user) $ activeAgentServers cfg p servers
ok user
SetUserProtoServers serversConfig -> withUser $ \User {userId} ->
processChatCommand $ APISetUserProtoServers userId serversConfig
APITestProtoServer userId srv@(AProtoServerWithAuth _ server) -> withUserId userId $ \user ->
@@ -1300,7 +1338,7 @@ processChatCommand' vr = \case
connectionStats <- withAgent $ \a -> abortConnectionSwitch a connId
pure $ CRGroupMemberSwitchAborted user g m connectionStats
_ -> throwChatError CEGroupMemberNotActive
APISyncContactRatchet contactId force -> withUser $ \user -> withChatLock "syncContactRatchet" $ do
APISyncContactRatchet contactId force -> withUser $ \user -> withContactLock "syncContactRatchet" contactId $ do
ct <- withStore $ \db -> getContact db vr user contactId
case contactConn ct of
Just conn@Connection {pqSupport} -> do
@@ -1308,7 +1346,7 @@ processChatCommand' vr = \case
createInternalChatItem user (CDDirectSnd ct) (CISndConnEvent $ SCERatchetSync rss Nothing) Nothing
pure $ CRContactRatchetSyncStarted user ct cStats
Nothing -> throwChatError $ CEContactNotActive ct
APISyncGroupMemberRatchet gId gMemberId force -> withUser $ \user -> withChatLock "syncGroupMemberRatchet" $ do
APISyncGroupMemberRatchet gId gMemberId force -> withUser $ \user -> withGroupLock "syncGroupMemberRatchet" gId $ do
(g, m) <- withStore $ \db -> (,) <$> getGroupInfo db vr user gId <*> getGroupMember db vr user gId gMemberId
case memberConnId m of
Just connId -> do
@@ -1397,7 +1435,7 @@ processChatCommand' vr = \case
EnableGroupMember gName mName -> withMemberName gName mName $ \gId mId -> APIEnableGroupMember gId mId
ChatHelp section -> pure $ CRChatHelp section
Welcome -> withUser $ pure . CRWelcome
APIAddContact userId incognito -> withUserId userId $ \user -> withChatLock "addContact" . procCmd $ do
APIAddContact userId incognito -> withUserId userId $ \user -> procCmd $ do
-- [incognito] generate profile for connection
incognitoProfile <- if incognito then Just <$> liftIO generateRandomProfile else pure Nothing
subMode <- chatReadVar subscriptionMode
@@ -1424,9 +1462,8 @@ processChatCommand' vr = \case
Just conn' -> pure $ CRConnectionIncognitoUpdated user conn'
Nothing -> throwChatError CEConnectionIncognitoChangeProhibited
APIConnectPlan userId cReqUri -> withUserId userId $ \user ->
withChatLock "connectPlan" . procCmd $
CRConnectionPlan user <$> connectPlan user cReqUri
APIConnect userId incognito (Just (ACR SCMInvitation cReq)) -> withUserId userId $ \user -> withChatLock "connect" . procCmd $ do
CRConnectionPlan user <$> connectPlan user cReqUri
APIConnect userId incognito (Just (ACR SCMInvitation cReq)) -> withUserId userId $ \user -> withInvitationLock "connect" (strEncode cReq) . procCmd $ do
subMode <- chatReadVar subscriptionMode
-- [incognito] generate profile to send
incognitoProfile <- if incognito then Just <$> liftIO generateRandomProfile else pure Nothing
@@ -1471,7 +1508,7 @@ processChatCommand' vr = \case
CRContactsList user <$> withStore' (\db -> getUserContacts db vr user)
ListContacts -> withUser $ \User {userId} ->
processChatCommand $ APIListContacts userId
APICreateMyAddress userId -> withUserId userId $ \user -> withChatLock "createMyAddress" . procCmd $ do
APICreateMyAddress userId -> withUserId userId $ \user -> procCmd $ do
subMode <- chatReadVar subscriptionMode
-- TODO v5.7 pass IPPQOn
(connId, cReq) <- withAgent $ \a -> createConnection a (aUserId user) True SCMContact Nothing IKPQOff subMode
@@ -1636,7 +1673,7 @@ processChatCommand' vr = \case
pure $ CRGroupCreated user groupInfo
NewGroup incognito gProfile -> withUser $ \User {userId} ->
processChatCommand $ APINewGroup userId incognito gProfile
APIAddMember groupId contactId memRole -> withUser $ \user -> withChatLock "addMember" $ do
APIAddMember groupId contactId memRole -> withUser $ \user -> withGroupLock "addMember" groupId $ do
-- TODO for large groups: no need to load all members to determine if contact is a member
(group, contact) <- withStore $ \db -> (,) <$> getGroup db vr user groupId <*> getContact db vr user contactId
assertDirectAllowed user MDSnd contact XGrpInv_
@@ -1666,7 +1703,7 @@ processChatCommand' vr = \case
Nothing -> throwChatError $ CEGroupCantResendInvitation gInfo cName
| otherwise -> throwChatError $ CEGroupDuplicateMember cName
APIJoinGroup groupId -> withUser $ \user@User {userId} -> do
withChatLock "joinGroup" . procCmd $ do
withGroupLock "joinGroup" groupId . procCmd $ do
(invitation, ct) <- withStore $ \db -> do
inv@ReceivedGroupInvitation {fromMember} <- getGroupInvitation db vr user groupId
(inv,) <$> getContactViaMember db vr user fromMember
@@ -1697,7 +1734,7 @@ processChatCommand' vr = \case
changeMemberRole user gInfo members m gEvent = do
let GroupMember {memberId = mId, memberRole = mRole, memberStatus = mStatus, memberContactId, localDisplayName = cName} = m
assertUserGroupRole gInfo $ maximum [GRAdmin, mRole, memRole]
withChatLock "memberRole" . procCmd $ do
withGroupLock "memberRole" groupId . procCmd $ do
unless (mRole == memRole) $ do
withStore' $ \db -> updateGroupMemberRole db user m memRole
case mStatus of
@@ -1719,7 +1756,7 @@ processChatCommand' vr = \case
let GroupMember {memberId = bmMemberId, memberRole = bmRole, memberProfile = bmp} = bm
assertUserGroupRole gInfo $ max GRAdmin bmRole
when (blocked == blockedByAdmin bm) $ throwChatError $ CECommandError $ if blocked then "already blocked" else "already unblocked"
withChatLock "blockForAll" . procCmd $ do
withGroupLock "blockForAll" groupId . procCmd $ do
let mrs = if blocked then MRSBlocked else MRSUnrestricted
event = XGrpMemRestrict bmMemberId MemberRestrictions {restriction = mrs}
(msg, _) <- sendGroupMessage' user gInfo remainingMembers event
@@ -1741,7 +1778,7 @@ processChatCommand' vr = \case
Nothing -> throwChatError CEGroupMemberNotFound
Just m@GroupMember {memberId = mId, memberRole = mRole, memberStatus = mStatus, memberProfile} -> do
assertUserGroupRole gInfo $ max GRAdmin mRole
withChatLock "removeMember" . procCmd $ do
withGroupLock "removeMember" groupId . procCmd $ do
case mStatus of
GSMemInvited -> do
deleteMemberConnection user m
@@ -1757,7 +1794,7 @@ processChatCommand' vr = \case
APILeaveGroup groupId -> withUser $ \user@User {userId} -> do
Group gInfo@GroupInfo {membership} members <- withStore $ \db -> getGroup db vr user groupId
filesInfo <- withStore' $ \db -> getGroupFileInfo db user gInfo
withChatLock "leaveGroup" . procCmd $ do
withGroupLock "leaveGroup" groupId . procCmd $ do
cancelFilesInProgress user filesInfo
(msg, _) <- sendGroupMessage' user gInfo members XGrpLeave
ci <- saveSndChatItem user (CDGroupSnd gInfo) msg (CISndGroupEvent SGEUserLeft)
@@ -1807,7 +1844,7 @@ processChatCommand' vr = \case
updateGroupProfileByName gName $ \p -> p {description}
ShowGroupDescription gName -> withUser $ \user ->
CRGroupDescription user <$> withStore (\db -> getGroupInfoByName db vr user gName)
APICreateGroupLink groupId mRole -> withUser $ \user -> withChatLock "createGroupLink" $ do
APICreateGroupLink groupId mRole -> withUser $ \user -> withGroupLock "createGroupLink" groupId $ do
gInfo <- withStore $ \db -> getGroupInfo db vr user groupId
assertUserGroupRole gInfo GRAdmin
when (mRole > GRMember) $ throwChatError $ CEGroupMemberInitialRole gInfo mRole
@@ -1817,14 +1854,14 @@ processChatCommand' vr = \case
(connId, cReq) <- withAgent $ \a -> createConnection a (aUserId user) True SCMContact (Just crClientData) IKPQOff subMode
withStore $ \db -> createGroupLink db user gInfo connId cReq groupLinkId mRole subMode
pure $ CRGroupLinkCreated user gInfo cReq mRole
APIGroupLinkMemberRole groupId mRole' -> withUser $ \user -> withChatLock "groupLinkMemberRole " $ do
APIGroupLinkMemberRole groupId mRole' -> withUser $ \user -> withGroupLock "groupLinkMemberRole" groupId $ do
gInfo <- withStore $ \db -> getGroupInfo db vr user groupId
(groupLinkId, groupLink, mRole) <- withStore $ \db -> getGroupLink db user gInfo
assertUserGroupRole gInfo GRAdmin
when (mRole' > GRMember) $ throwChatError $ CEGroupMemberInitialRole gInfo mRole'
when (mRole' /= mRole) $ withStore' $ \db -> setGroupLinkMemberRole db user groupLinkId mRole'
pure $ CRGroupLink user gInfo groupLink mRole'
APIDeleteGroupLink groupId -> withUser $ \user -> withChatLock "deleteGroupLink" $ do
APIDeleteGroupLink groupId -> withUser $ \user -> withGroupLock "deleteGroupLink" groupId $ do
gInfo <- withStore $ \db -> getGroupInfo db vr user groupId
deleteGroupLink' user gInfo
pure $ CRGroupLinkDeleted user gInfo
@@ -1932,19 +1969,19 @@ processChatCommand' vr = \case
ForwardImage chatName fileId -> forwardFile chatName fileId SendImage
SendFileDescription _chatName _f -> pure $ chatCmdError Nothing "TODO"
ReceiveFile fileId encrypted_ rcvInline_ filePath_ -> withUser $ \_ ->
withChatLock "receiveFile" . procCmd $ do
withFileLock "receiveFile" fileId . procCmd $ do
(user, ft) <- withStore (`getRcvFileTransferById` fileId)
encrypt <- (`fromMaybe` encrypted_) <$> chatReadVar encryptLocalFiles
ft' <- (if encrypt then setFileToEncrypt else pure) ft
receiveFile' user ft' rcvInline_ filePath_
SetFileToReceive fileId encrypted_ -> withUser $ \_ -> do
withChatLock "setFileToReceive" . procCmd $ do
withFileLock "setFileToReceive" fileId . procCmd $ do
encrypt <- (`fromMaybe` encrypted_) <$> chatReadVar encryptLocalFiles
cfArgs <- if encrypt then Just <$> (atomically . CF.randomArgs =<< asks random) else pure Nothing
withStore' $ \db -> setRcvFileToReceive db fileId cfArgs
ok_
CancelFile fileId -> withUser $ \user@User {userId} ->
withChatLock "cancelFile" . procCmd $
withFileLock "cancelFile" fileId . procCmd $
withStore (\db -> getFileTransfer db user fileId) >>= \case
FTSnd ftm@FileTransferMeta {xftpSndFile, cancelled} fts
| cancelled -> throwChatError $ CEFileCancel fileId "file already cancelled"
@@ -2074,8 +2111,18 @@ processChatCommand' vr = \case
pure $ CRVersionInfo {versionInfo, chatMigrations, agentMigrations}
DebugLocks -> lift $ do
chatLockName <- atomically . tryReadTMVar =<< asks chatLock
chatEntityLocks <- getLocks =<< asks entityLocks
agentLocks <- withAgent' debugAgentLocks
pure CRDebugLocks {chatLockName, agentLocks}
pure CRDebugLocks {chatLockName, chatEntityLocks, agentLocks}
where
getLocks ls = atomically $ M.mapKeys enityLockString . M.mapMaybe id <$> (mapM tryReadTMVar =<< readTVar ls)
enityLockString cle = case cle of
CLInvitation bs -> "Invitation " <> B.unpack bs
CLConnection connId -> "Connection " <> show connId
CLContact ctId -> "Contact " <> show ctId
CLGroup gId -> "Group " <> show gId
CLUserContact ucId -> "UserContact " <> show ucId
CLFile fId -> "File " <> show fId
GetAgentWorkers -> lift $ CRAgentWorkersSummary <$> withAgent' getAgentWorkersSummary
GetAgentWorkersDetails -> lift $ CRAgentWorkersDetails <$> withAgent' getAgentWorkersDetails
GetAgentStats -> lift $ CRAgentStats . map stat <$> withAgent' getAgentStats
@@ -2101,7 +2148,6 @@ processChatCommand' vr = \case
-- in a modified CLI app or core - the hook should return Either ChatResponse ChatCommand
CustomChatCommand _cmd -> withUser $ \user -> pure $ chatCmdError (Just user) "not supported"
where
withChatLock name action = asks chatLock >>= \l -> withLock l name action
-- below code would make command responses asynchronous where they can be slow
-- in View.hs `r'` should be defined as `id` in this case
-- procCmd :: m ChatResponse -> m ChatResponse
@@ -2167,7 +2213,7 @@ processChatCommand' vr = \case
CTLocal -> withStore $ \db -> getLocalChatItemIdByText' db user cId msg
_ -> throwChatError $ CECommandError "not supported"
connectViaContact :: User -> IncognitoEnabled -> ConnectionRequestUri 'CMContact -> CM ChatResponse
connectViaContact user@User {userId} incognito cReq@(CRContactUri ConnReqUriData {crClientData}) = withChatLock "connectViaContact" $ do
connectViaContact user@User {userId} incognito cReq@(CRContactUri ConnReqUriData {crClientData}) = withInvitationLock "connectViaContact" (strEncode cReq) $ do
let groupLinkId = crClientData >>= decodeJSON >>= \(CRDataGroup gli) -> Just gli
cReqHash = ConnReqUriHash . C.sha256Hash $ strEncode cReq
case groupLinkId of
@@ -2198,7 +2244,7 @@ processChatCommand' vr = \case
pure $ CRSentInvitation user conn incognitoProfile
connectContactViaAddress :: User -> IncognitoEnabled -> Contact -> ConnectionRequestUri 'CMContact -> CM ChatResponse
connectContactViaAddress user incognito ct cReq =
withChatLock "connectViaContact" $ do
withInvitationLock "connectContactViaAddress" (strEncode cReq) $ do
newXContactId <- XContactId <$> drgRandomBytes 16
pqSup <- chatReadVar pqExperimentalEnabled
(connId, incognitoProfile, subMode, chatV) <- requestContact user incognito cReq newXContactId False pqSup
@@ -2265,8 +2311,9 @@ processChatCommand' vr = \case
-- [incognito] filter out contacts with whom user has incognito connections
addChangedProfileContact :: User -> Contact -> [ChangedProfileContact] -> [ChangedProfileContact]
addChangedProfileContact user' ct changedCts = case contactSendConn_ ct' of
Right conn | not (connIncognito conn) && mergedProfile' /= mergedProfile ->
ChangedProfileContact ct ct' mergedProfile' conn : changedCts
Right conn
| not (connIncognito conn) && mergedProfile' /= mergedProfile ->
ChangedProfileContact ct ct' mergedProfile' conn : changedCts
_ -> changedCts
where
mergedProfile = userProfileToSend user Nothing (Just ct) False
@@ -2289,7 +2336,7 @@ processChatCommand' vr = \case
let mergedProfile = userProfileToSend user (fromLocalProfile <$> incognitoProfile) (Just ct) False
mergedProfile' = userProfileToSend user (fromLocalProfile <$> incognitoProfile) (Just ct') False
when (mergedProfile' /= mergedProfile) $
withChatLock "updateProfile" $ do
withContactLock "updateProfile" (contactId' ct) $ do
void (sendDirectContactMessage user ct' $ XInfo mergedProfile') `catchChatError` (toView . CRChatError (Just user))
lift . when (directOrUsed ct') $ createSndFeatureItems user ct ct'
pure $ CRContactPrefsUpdated user ct ct'
@@ -2334,7 +2381,7 @@ processChatCommand' vr = \case
user <- getUserByContactId db ctId
(user,) <$> getContact db vr user ctId
calls <- asks currentCalls
withChatLock "currentCall" $
withContactLock "currentCall" ctId $
atomically (TM.lookup ctId calls) >>= \case
Nothing -> throwChatError CENoCurrentCall
Just call@Call {contactId}
@@ -2988,21 +3035,16 @@ deleteGroupLink_ user gInfo conn = do
agentSubscriber :: CM' ()
agentSubscriber = do
q <- asks $ subQ . smpAgent
l <- asks chatLock
forever $ atomically (readTBQueue q) >>= process l
forever $ atomically (readTBQueue q) >>= process
where
process :: Lock -> (ACorrId, EntityId, APartyCmd 'Agent) -> CM' ()
process l (corrId, entId, APC e msg) = run $ case e of
process :: (ACorrId, EntityId, APartyCmd 'Agent) -> CM' ()
process (corrId, entId, APC e msg) = run $ case e of
SAENone -> processAgentMessageNoConn msg
SAEConn -> processAgentMessage corrId entId msg
SAERcvFile -> processAgentMsgRcvFile corrId entId msg
SAESndFile -> processAgentMsgSndFile corrId entId msg
where
run action = do
let name = "agentSubscriber entity=" <> show e <> " entId=" <> str entId <> " msg=" <> str (aCommandTag msg)
withLock' l name $ action `catchChatError'` (toView' . CRChatError Nothing)
str :: StrEncoding a => a -> String
str = B.unpack . strEncode
run action = action `catchChatError'` (toView' . CRChatError Nothing)
type AgentBatchSubscribe = AgentClient -> [ConnId] -> ExceptT AgentErrorType IO (Map ConnId (Either AgentErrorType ()))
@@ -3150,8 +3192,7 @@ subscribeUserConnections vr onlyNeeded agentBatchSubscribe user = do
forM_ err_ $ toView . CRSndFileSubError user ft
void . forkIO $ do
threadDelay 1000000
l <- asks chatLock
when (fileStatus == FSConnected) . unlessM (isFileActive fileId sndFiles) . withLock l "subscribe sendFileChunk" $
when (fileStatus == FSConnected) . unlessM (isFileActive fileId sndFiles) . withChatLock "subscribe sendFileChunk" $
sendFileChunk user ft
rcvFileSubsToView :: Map ConnId (Either AgentErrorType ()) -> Map ConnId RcvFileTransfer -> CM ()
rcvFileSubsToView rs = mapM_ (toView . uncurry (CRRcvFileSubError user)) . filterErrors . resultsFor rs
@@ -3317,11 +3358,13 @@ processAgentMessage _ connId (DEL_RCVQ srv qId err_) =
processAgentMessage _ connId DEL_CONN =
toView $ CRAgentConnDeleted (AgentConnId connId)
processAgentMessage corrId connId msg = do
vr <- chatVersionRange
-- getUserByAConnId never throws logical errors, only SEDBBusyError can be thrown here
critical (withStore' (`getUserByAConnId` AgentConnId connId)) >>= \case
Just user -> processAgentMessageConn vr user corrId connId msg `catchChatError` (toView . CRChatError (Just user))
_ -> throwChatError $ CENoConnectionUser (AgentConnId connId)
lockEntity <- critical (withStore (`getChatLockEntity` AgentConnId connId))
withEntityLock "processAgentMessage" lockEntity $ do
vr <- chatVersionRange
-- getUserByAConnId never throws logical errors, only SEDBBusyError can be thrown here
critical (withStore' (`getUserByAConnId` AgentConnId connId)) >>= \case
Just user -> processAgentMessageConn vr user corrId connId msg `catchChatError` (toView . CRChatError (Just user))
_ -> throwChatError $ CENoConnectionUser (AgentConnId connId)
-- CRITICAL error will be shown to the user as alert with restart button in Android/desktop apps.
-- SEDBBusyError will only be thrown on IO exceptions or SQLError during DB queries,
@@ -3358,18 +3401,18 @@ processAgentMessageNoConn = \case
toView $ event srv cs
processAgentMsgSndFile :: ACorrId -> SndFileId -> ACommand 'Agent 'AESndFile -> CM ()
processAgentMsgSndFile _corrId aFileId msg =
withStore' (`getUserByASndFileId` AgentSndFileId aFileId) >>= \case
Just user -> process user `catchChatError` (toView . CRChatError (Just user))
_ -> do
lift $ withAgent' (`xftpDeleteSndFileInternal` aFileId)
throwChatError $ CENoSndFileUser $ AgentSndFileId aFileId
processAgentMsgSndFile _corrId aFileId msg = do
fileId <- withStore (`getXFTPSndFileDBId` AgentSndFileId aFileId)
withFileLock "processAgentMsgSndFile" fileId $
withStore' (`getUserByASndFileId` AgentSndFileId aFileId) >>= \case
Just user -> process user fileId `catchChatError` (toView . CRChatError (Just user))
_ -> do
lift $ withAgent' (`xftpDeleteSndFileInternal` aFileId)
throwChatError $ CENoSndFileUser $ AgentSndFileId aFileId
where
process :: User -> CM ()
process user = do
(ft@FileTransferMeta {fileId, xftpRedirectFor, cancelled}, sfts) <- withStore $ \db -> do
fileId <- getXFTPSndFileDBId db user $ AgentSndFileId aFileId
getSndFileTransfer db user fileId
process :: User -> FileTransferId -> CM ()
process user fileId = do
(ft@FileTransferMeta {xftpRedirectFor, cancelled}, sfts) <- withStore $ \db -> getSndFileTransfer db user fileId
vr <- chatVersionRange
unless cancelled $ case msg of
SFPROG sndProgress sndTotal -> do
@@ -3386,11 +3429,11 @@ processAgentMsgSndFile _corrId aFileId msg =
lift $ withAgent' (`xftpDeleteSndFileInternal` aFileId)
withStore' $ \db -> createExtraSndFTDescrs db user fileId (map fileDescrText rfds)
case rfds of
[] -> sendFileError "no receiver descriptions" fileId vr ft
[] -> sendFileError "no receiver descriptions" vr ft
rfd : _ -> case [fd | fd@(FD.ValidFileDescription FD.FileDescription {chunks = [_]}) <- rfds] of
[] -> case xftpRedirectFor of
Nothing -> xftpSndFileRedirect user fileId rfd >>= toView . CRSndFileRedirectStartXFTP user ft
Just _ -> sendFileError "Prohibit chaining redirects" fileId vr ft
Just _ -> sendFileError "Prohibit chaining redirects" vr ft
rfds' -> do
-- we have 1 chunk - use it as URI whether it is redirect or not
ft' <- maybe (pure ft) (\fId -> withStore $ \db -> getFileTransferMeta db user fId) xftpRedirectFor
@@ -3439,7 +3482,7 @@ processAgentMsgSndFile _corrId aFileId msg =
| temporaryAgentError e ->
throwChatError $ CEXFTPSndFile fileId (AgentSndFileId aFileId) e
| otherwise ->
sendFileError (tshow e) fileId vr ft
sendFileError (tshow e) vr ft
where
fileDescrText :: FilePartyI p => ValidFileDescription p -> T.Text
fileDescrText = safeDecodeUtf8 . strEncode
@@ -3457,8 +3500,8 @@ processAgentMsgSndFile _corrId aFileId msg =
case L.nonEmpty fds of
Just fds' -> loopSend fds'
Nothing -> pure msgDeliveryId
sendFileError :: Text -> Int64 -> (PQSupport -> VersionRangeChat) -> FileTransferMeta -> CM ()
sendFileError err fileId vr ft = do
sendFileError :: Text -> (PQSupport -> VersionRangeChat) -> FileTransferMeta -> CM ()
sendFileError err vr ft = do
logError $ "Sent file error: " <> err
ci <- withStore $ \db -> do
liftIO $ updateFileCancelled db user fileId CIFSSndError
@@ -3480,18 +3523,18 @@ splitFileDescr rfdText = do
else fileDescr <| splitParts (partNo + 1) partSize rest
processAgentMsgRcvFile :: ACorrId -> RcvFileId -> ACommand 'Agent 'AERcvFile -> CM ()
processAgentMsgRcvFile _corrId aFileId msg =
withStore' (`getUserByARcvFileId` AgentRcvFileId aFileId) >>= \case
Just user -> process user `catchChatError` (toView . CRChatError (Just user))
_ -> do
lift $ withAgent' (`xftpDeleteRcvFile` aFileId)
throwChatError $ CENoRcvFileUser $ AgentRcvFileId aFileId
processAgentMsgRcvFile _corrId aFileId msg = do
fileId <- withStore (`getXFTPRcvFileDBId` AgentRcvFileId aFileId)
withFileLock "processAgentMsgRcvFile" fileId $
withStore' (`getUserByARcvFileId` AgentRcvFileId aFileId) >>= \case
Just user -> process user fileId `catchChatError` (toView . CRChatError (Just user))
_ -> do
lift $ withAgent' (`xftpDeleteRcvFile` aFileId)
throwChatError $ CENoRcvFileUser $ AgentRcvFileId aFileId
where
process :: User -> CM ()
process user = do
ft@RcvFileTransfer {fileId} <- withStore $ \db -> do
fileId <- getXFTPRcvFileDBId db $ AgentRcvFileId aFileId
getRcvFileTransfer db user fileId
process :: User -> FileTransferId -> CM ()
process user fileId = do
ft <- withStore $ \db -> getRcvFileTransfer db user fileId
vr <- chatVersionRange
unless (rcvFileCompleteOrCancelled ft) $ case msg of
RFPROG rcvProgress rcvTotal -> do
@@ -3597,7 +3640,8 @@ processAgentMessageConn vr user@User {userId} corrId agentConnId agentMessage =
-- probably this branch is never executed, so there should be no reason
-- to save message if contact hasn't been created yet - chat item isn't created anyway
withAckMessage' agentConnId meta $
void $ saveDirectRcvMSG conn meta msgBody
void $
saveDirectRcvMSG conn meta msgBody
SENT msgId ->
sentMsgDeliveryEvent conn msgId
OK ->
@@ -3634,7 +3678,6 @@ processAgentMessageConn vr user@User {userId} corrId agentConnId agentMessage =
(conn'', msg@RcvMessage {chatMsgEvent = ACME _ event}) <- saveDirectRcvMSG conn' msgMeta msgBody
let ct'' = ct' {activeConn = Just conn''} :: Contact
assertDirectAllowed user MDRcv ct'' $ toCMEventTag event
updateChatLock "direct message" event
case event of
XMsgNew mc -> newContentMessage ct'' mc msg msgMeta
XMsgFileDescr sharedMsgId fileDescr -> messageFileDescription ct'' sharedMsgId fileDescr
@@ -4053,7 +4096,6 @@ processAgentMessageConn vr user@User {userId} corrId agentConnId agentMessage =
processEvent :: MsgEncodingI e => ChatMessage e -> CM ()
processEvent chatMsg = do
(m', conn', msg@RcvMessage {chatMsgEvent = ACME _ event}) <- saveGroupRcvMsg user groupId m conn msgMeta msgBody chatMsg
updateChatLock "groupMessage" event
case event of
XMsgNew mc -> memberCanSend m' $ newGroupContentMessage gInfo m' mc msg brokerTs False
XMsgFileDescr sharedMsgId fileDescr -> memberCanSend m' $ groupMessageFileDescription gInfo m' sharedMsgId fileDescr
@@ -4389,13 +4431,6 @@ processAgentMessageConn vr user@User {userId} corrId agentConnId agentMessage =
toView $ CRConnectionDisabled connEntity
_ -> pure ()
updateChatLock :: MsgEncodingI enc => String -> ChatMsgEvent enc -> CM ()
updateChatLock name event = do
l <- asks chatLock
atomically $ tryReadTMVar l >>= mapM_ (swapTMVar l . (<> s))
where
s = " " <> name <> "=" <> B.unpack (strEncode $ toCMEventTag event)
-- TODO v5.7 / v6.0 - together with deprecating old group protocol establishing direct connections?
-- we could save command records only for agent APIs we process continuations for (INV)
withCompletedCommand :: forall e. AEntityI e => Connection -> ACommand 'Agent e -> (CommandData -> CM ()) -> CM ()
@@ -4433,9 +4468,9 @@ processAgentMessageConn vr user@User {userId} corrId agentConnId agentMessage =
-- This prevents losing the message that failed to be processed.
Left (ChatErrorStore SEDBBusyError {message}) | showCritical -> throwError $ ChatErrorAgent (CRITICAL True message) Nothing
Left e -> ackMsg msgMeta Nothing >> throwError e
where
ackMsg :: MsgMeta -> Maybe MsgReceiptInfo -> CM ()
ackMsg MsgMeta {recipient = (msgId, _)} rcpt = withAgent $ \a -> ackMessageAsync a "" cId msgId rcpt
where
ackMsg :: MsgMeta -> Maybe MsgReceiptInfo -> CM ()
ackMsg MsgMeta {recipient = (msgId, _)} rcpt = withAgent $ \a -> ackMessageAsync a "" cId msgId rcpt
sentMsgDeliveryEvent :: Connection -> AgentMsgId -> CM ()
sentMsgDeliveryEvent Connection {connId} msgId =
+8 -7
View File
@@ -59,7 +59,7 @@ import Simplex.Chat.Messages.CIContent
import Simplex.Chat.Protocol
import Simplex.Chat.Remote.AppVersion
import Simplex.Chat.Remote.Types
import Simplex.Chat.Store (AutoAccept, StoreError (..), UserContactLink, UserMsgReceiptSettings)
import Simplex.Chat.Store (AutoAccept, ChatLockEntity, StoreError (..), UserContactLink, UserMsgReceiptSettings)
import Simplex.Chat.Types
import Simplex.Chat.Types.Preferences
import Simplex.Chat.Util (liftIOEither)
@@ -165,7 +165,7 @@ defaultChatHooks =
ChatHooks
{ preCmdHook = \_ -> pure . Right,
eventHook = \_ -> pure
}
}
data DefaultAgentServers = DefaultAgentServers
{ smp :: NonEmpty SMPServerWithAuth,
@@ -208,6 +208,7 @@ data ChatController = ChatController
connNetworkStatuses :: TMap AgentConnId NetworkStatus,
subscriptionMode :: TVar SubscriptionMode,
chatLock :: Lock,
entityLocks :: TMap ChatLockEntity Lock,
sndFiles :: TVar (Map Int64 Handle),
rcvFiles :: TVar (Map Int64 Handle),
currentCalls :: TMap ContactId Call,
@@ -491,9 +492,9 @@ data ChatCommand
| GetAgentSubsDetails
| GetAgentWorkers
| GetAgentWorkersDetails
-- The parser will return this command for strings that start from "//".
-- This command should be processed in preCmdHook
| CustomChatCommand ByteString
| -- The parser will return this command for strings that start from "//".
-- This command should be processed in preCmdHook
CustomChatCommand ByteString
deriving (Show)
allowRemoteCommand :: ChatCommand -> Bool -- XXX: consider using Relay/Block/ForceLocal
@@ -731,7 +732,7 @@ data ChatResponse
| CRContactPQEnabled {user :: User, contact :: Contact, pqEnabled :: PQEncryption}
| CRSQLResult {rows :: [Text]}
| CRSlowSQLQueries {chatQueries :: [SlowSQLQuery], agentQueries :: [SlowSQLQuery]}
| CRDebugLocks {chatLockName :: Maybe String, agentLocks :: AgentLocks}
| CRDebugLocks {chatLockName :: Maybe String, chatEntityLocks :: Map String String, agentLocks :: AgentLocks}
| CRAgentStats {agentStats :: [[String]]}
| CRAgentWorkersDetails {agentWorkersDetails :: AgentWorkersDetails}
| CRAgentWorkersSummary {agentWorkersSummary :: AgentWorkersSummary}
@@ -1353,7 +1354,7 @@ handleDBErrors =
[ E.Handler $ \(e :: SQLError) ->
let se = SQL.sqlError e
busy = se == SQL.ErrorBusy || se == SQL.ErrorLocked
in pure . Left . ChatErrorStore $ if busy then SEDBBusyError $ show se else SEDBException $ show e,
in pure . Left . ChatErrorStore $ if busy then SEDBBusyError $ show se else SEDBException $ show e,
E.Handler $ \(E.SomeException e) -> pure . Left . ChatErrorStore . SEDBException $ show e
]
+1 -1
View File
@@ -48,7 +48,7 @@ import Simplex.Chat.Types
import Simplex.Chat.Types.Util
import Simplex.Messaging.Agent.Protocol (VersionSMPA, pqdrSMPAgentVersion)
import Simplex.Messaging.Compression (compress1, decompressBatch)
import Simplex.Messaging.Crypto.Ratchet (PQSupport (..), pattern PQSupportOn, pattern PQSupportOff)
import Simplex.Messaging.Crypto.Ratchet (PQSupport (..), pattern PQSupportOff, pattern PQSupportOn)
import Simplex.Messaging.Encoding
import Simplex.Messaging.Encoding.String
import Simplex.Messaging.Parsers (defaultJSON, dropPrefix, fromTextField_, fstToLower, parseAll, sumTypeJSON, taggedObjectJSON)
+1
View File
@@ -1,6 +1,7 @@
module Simplex.Chat.Store
( SQLiteStore,
StoreError (..),
ChatLockEntity (..),
UserMsgReceiptSettings (..),
UserContactLink (..),
AutoAccept (..),
+28 -1
View File
@@ -3,11 +3,13 @@
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE QuasiQuotes #-}
{-# LANGUAGE TupleSections #-}
{-# LANGUAGE TypeOperators #-}
{-# OPTIONS_GHC -fno-warn-ambiguous-fields #-}
module Simplex.Chat.Store.Connections
( getConnectionEntity,
( getChatLockEntity,
getConnectionEntity,
getConnectionEntityByConnReq,
getContactConnEntityByConnReqHash,
getConnectionsToSubscribe,
@@ -37,6 +39,31 @@ import qualified Simplex.Messaging.Agent.Store.SQLite.DB as DB
import Simplex.Messaging.Crypto.Ratchet (PQSupport)
import Simplex.Messaging.Util (eitherToMaybe)
getChatLockEntity :: DB.Connection -> AgentConnId -> ExceptT StoreError IO ChatLockEntity
getChatLockEntity db agentConnId = do
((connId, connType) :. (contactId, groupMemberId, sndFileId, rcvFileId, userContactLinkId)) <-
ExceptT . firstRow id (SEConnectionNotFound agentConnId) $
DB.query
db
[sql|
SELECT connection_id, conn_type, contact_id, group_member_id, snd_file_id, rcv_file_id, user_contact_link_id
FROM connections
WHERE agent_conn_id = ?
|]
(Only agentConnId)
let err = throwError $ SEInternalError $ "connection " <> show connType <> " without entity"
case connType of
ConnMember -> maybe err (fmap CLGroup . getMemberGroupId) groupMemberId
ConnContact -> pure $ maybe (CLConnection connId) CLContact contactId
ConnSndFile -> maybe err (pure . CLFile) sndFileId
ConnRcvFile -> maybe err (pure . CLFile) rcvFileId
ConnUserContact -> maybe err (pure . CLUserContact) userContactLinkId
where
getMemberGroupId :: GroupMemberId -> ExceptT StoreError IO GroupId
getMemberGroupId groupMemberId =
ExceptT . firstRow fromOnly (SEInternalError "group member connection group_id not found") $
DB.query db "SELECT group_id FROM group_members WHERE group_member_id = ?" (Only groupMemberId)
getConnectionEntity :: DB.Connection -> (PQSupport -> VersionRangeChat) -> User -> AgentConnId -> ExceptT StoreError IO ConnectionEntity
getConnectionEntity db vr user@User {userId, userContactId} agentConnId = do
c@Connection {connType, entityId} <- getConnection_
+3 -3
View File
@@ -336,10 +336,10 @@ setSndFTAgentDeleted db User {userId} fileId = do
"UPDATE files SET agent_snd_file_deleted = 1, updated_at = ? WHERE user_id = ? AND file_id = ?"
(currentTs, userId, fileId)
getXFTPSndFileDBId :: DB.Connection -> User -> AgentSndFileId -> ExceptT StoreError IO FileTransferId
getXFTPSndFileDBId db User {userId} aSndFileId =
getXFTPSndFileDBId :: DB.Connection -> AgentSndFileId -> ExceptT StoreError IO FileTransferId
getXFTPSndFileDBId db aSndFileId =
ExceptT . firstRow fromOnly (SESndFileNotFoundXFTP aSndFileId) $
DB.query db "SELECT file_id FROM files WHERE user_id = ? AND agent_snd_file_id = ?" (userId, aSndFileId)
DB.query db "SELECT file_id FROM files WHERE agent_snd_file_id = ?" (Only aSndFileId)
getXFTPRcvFileDBId :: DB.Connection -> AgentRcvFileId -> ExceptT StoreError IO FileTransferId
getXFTPRcvFileDBId db aRcvFileId =
+9
View File
@@ -45,6 +45,15 @@ import Simplex.Messaging.Util (allFinally)
import Simplex.Messaging.Version
import UnliftIO.STM
data ChatLockEntity
= CLInvitation ByteString
| CLConnection Int64
| CLContact ContactId
| CLGroup GroupId
| CLUserContact Int64
| CLFile Int64
deriving (Eq, Ord)
-- These error type constructors must be added to mobile apps
data StoreError
= SEDuplicateName
+3 -2
View File
@@ -351,8 +351,9 @@ responseToView hu@(currentRH, user_) ChatConfig {logLevel, showReactions, showRe
<> (" :: avg: " <> sShow timeAvg <> " ms")
<> (" :: " <> plain (T.unwords $ T.lines query))
in ("Chat queries" : map viewQuery chatQueries) <> [""] <> ("Agent queries" : map viewQuery agentQueries)
CRDebugLocks {chatLockName, agentLocks} ->
CRDebugLocks {chatLockName, chatEntityLocks, agentLocks} ->
[ maybe "no chat lock" (("chat lock: " <>) . plain) chatLockName,
plain $ "chat entity locks: " <> LB.unpack (J.encode chatEntityLocks),
plain $ "agent locks: " <> LB.unpack (J.encode agentLocks)
]
CRAgentStats stats -> map (plain . intercalate ",") stats
@@ -1595,7 +1596,7 @@ standaloneUploadComplete FileTransferMeta {fileId, fileName} = \case
[] -> [fileTransferStr fileId fileName <> " upload complete."]
uris ->
fileTransferStr fileId fileName <> " upload complete. download with:"
: map plain uris
: map plain uris
sndFile :: SndFileTransfer -> StyledString
sndFile SndFileTransfer {fileId, fileName} = fileTransferStr fileId fileName
+2 -1
View File
@@ -787,7 +787,8 @@ testXFTPCancelRcvRepeat =
bob ##> "/fr 1 ./tests/tmp"
bob
<### [ "saving file 1 from alice to ./tests/tmp/testfile_1",
"started receiving file 1 (testfile) from alice"
"started receiving file 1 (testfile) from alice",
StartsWith "chat db error: SERcvFileNotFoundXFTP"
]
bob <## "completed receiving file 1 (testfile) from alice"
+6 -4
View File
@@ -179,10 +179,12 @@ testMultiWordProfileNames =
alice <# "#'Our Team' 'Bob James'> hi"
cath <# "#'Our Team' 'Bob James'> hi"
alice `send` "@'Cath Johnson' hello"
alice <## "member #'Our Team' 'Cath Johnson' does not have direct connection, creating"
alice <## "contact for member #'Our Team' 'Cath Johnson' is created"
alice <## "sent invitation to connect directly to member #'Our Team' 'Cath Johnson'"
alice <# "@'Cath Johnson' hello"
alice
<### [ "member #'Our Team' 'Cath Johnson' does not have direct connection, creating",
"contact for member #'Our Team' 'Cath Johnson' is created",
"sent invitation to connect directly to member #'Our Team' 'Cath Johnson'",
WithTime "@'Cath Johnson' hello"
]
cath <## "#'Our Team' 'Alice Jones' is creating direct contact 'Alice Jones' with you"
cath <# "'Alice Jones'> hello"
cath <## "'Alice Jones': contact is connected"