diff --git a/src/Simplex/Chat.hs b/src/Simplex/Chat.hs index c31ab1eb89..b79ad209d3 100644 --- a/src/Simplex/Chat.hs +++ b/src/Simplex/Chat.hs @@ -3032,24 +3032,23 @@ deleteGroupLink_ user gInfo conn = do deleteAgentConnectionAsync user $ aConnId conn withStore' $ \db -> deleteGroupLink db user gInfo -agentSubscriber :: forall m. (MonadUnliftIO m, MonadReader ChatController m) => m () +agentSubscriber :: forall m. ChatMonad' m => m () agentSubscriber = do q <- asks $ subQ . smpAgent - l <- asks chatLock - forever $ atomically (readTBQueue q) >>= void . process l + forever $ atomically (readTBQueue q) >>= void . process where - process :: Lock -> (ACorrId, EntityId, APartyCmd 'Agent) -> m (Either ChatError ()) - process l (corrId, entId, APC e msg) = run $ case e of + process :: (ACorrId, EntityId, APartyCmd 'Agent) -> m (Either ChatError ()) + process (corrId, entId, APC e msg) = run $ case e of SAENone -> processAgentMessageNoConn msg SAEConn -> processAgentMessage corrId entId msg SAERcvFile -> processAgentMsgRcvFile corrId entId msg SAESndFile -> processAgentMsgSndFile corrId entId msg where - run action = do - let name = "agentSubscriber entity=" <> show e <> " entId=" <> str entId <> " msg=" <> str (aCommandTag msg) - withLock l name $ runExceptT $ action `catchChatError` (toView . CRChatError Nothing) - str :: StrEncoding a => a -> String - str = B.unpack . strEncode + -- str :: StrEncoding a => a -> String + -- str = B.unpack . strEncode + run action = + -- let name = "agentSubscriber entity=" <> show e <> " entId=" <> str entId <> " msg=" <> str (aCommandTag msg) + runExceptT $ action `catchChatError` (toView . CRChatError Nothing) type AgentBatchSubscribe m = AgentClient -> [ConnId] -> ExceptT AgentErrorType m (Map ConnId (Either AgentErrorType ())) @@ -3197,8 +3196,7 @@ subscribeUserConnections vr onlyNeeded agentBatchSubscribe user = do forM_ err_ $ toView . CRSndFileSubError user ft void . forkIO $ do threadDelay 1000000 - l <- asks chatLock - when (fileStatus == FSConnected) . unlessM (isFileActive fileId sndFiles) . withLock l "subscribe sendFileChunk" $ + when (fileStatus == FSConnected) . unlessM (isFileActive fileId sndFiles) . withChatLock "subscribe sendFileChunk" $ sendFileChunk user ft rcvFileSubsToView :: Map ConnId (Either AgentErrorType ()) -> Map ConnId RcvFileTransfer -> m () rcvFileSubsToView rs = mapM_ (toView . uncurry (CRRcvFileSubError user)) . filterErrors . resultsFor rs @@ -3417,7 +3415,7 @@ processAgentMsgSndFile _corrId aFileId msg = fileId <- getXFTPSndFileDBId db user $ AgentSndFileId aFileId getSndFileTransfer db user fileId vr <- chatVersionRange - unless cancelled $ case msg of + unless cancelled $ withFileLock "processAgentMsgSndFile" fileId $ case msg of SFPROG sndProgress sndTotal -> do let status = CIFSSndTransfer {sndProgress, sndTotal} ci <- withStore $ \db -> do @@ -3539,7 +3537,7 @@ processAgentMsgRcvFile _corrId aFileId msg = fileId <- getXFTPRcvFileDBId db $ AgentRcvFileId aFileId getRcvFileTransfer db user fileId vr <- chatVersionRange - unless (rcvFileCompleteOrCancelled ft) $ case msg of + unless (rcvFileCompleteOrCancelled ft) $ withFileLock "processAgentMsgRcvFile" fileId $ case msg of RFPROG rcvProgress rcvTotal -> do let status = CIFSRcvTransfer {rcvProgress, rcvTotal} ci <- withStore $ \db -> do @@ -3625,7 +3623,7 @@ processAgentMessageConn vr user@User {userId} corrId agentConnId agentMessage = processDirectMessage :: ACommand 'Agent e -> ConnectionEntity -> Connection -> Maybe Contact -> m () processDirectMessage agentMsg connEntity conn@Connection {connId, connChatVersion, peerChatVRange, viaUserContactLink, customUserProfileId, connectionCode} = \case - Nothing -> case agentMsg of + Nothing -> withEntityLock "processDirectMessage conn" (CLConnection connId) $ case agentMsg of CONF confId pqSupport _ connInfo -> do conn' <- processCONFpqSupport conn pqSupport -- [incognito] send saved profile @@ -3662,7 +3660,7 @@ processAgentMessageConn vr user@User {userId} corrId agentConnId agentMessage = when (corrId /= "") $ withCompletedCommand conn agentMsg $ \_cmdData -> pure () -- TODO add debugging output _ -> pure () - Just ct@Contact {contactId} -> case agentMsg of + Just ct@Contact {contactId} -> withContactLock "processDirectMessage contact" contactId $ case agentMsg of INV (ACR _ cReq) -> -- [async agent commands] XGrpMemIntro continuation on receiving INV withCompletedCommand conn agentMsg $ \_ -> @@ -3682,7 +3680,7 @@ processAgentMessageConn vr user@User {userId} corrId agentConnId agentMessage = (conn'', msg@RcvMessage {chatMsgEvent = ACME _ event}) <- saveDirectRcvMSG conn' msgMeta cmdId msgBody let ct'' = ct' {activeConn = Just conn''} :: Contact assertDirectAllowed user MDRcv ct'' $ toCMEventTag event - updateChatLock "direct message" event + -- updateChatLock "direct message" event case event of XMsgNew mc -> newContentMessage ct'' mc msg msgMeta XMsgFileDescr sharedMsgId fileDescr -> messageFileDescription ct'' sharedMsgId fileDescr @@ -3834,7 +3832,7 @@ processAgentMessageConn vr user@User {userId} corrId agentConnId agentMessage = _ -> pure () processGroupMessage :: ACommand 'Agent e -> ConnectionEntity -> Connection -> GroupInfo -> GroupMember -> m () - processGroupMessage agentMsg connEntity conn@Connection {connId, connectionCode} gInfo@GroupInfo {groupId, groupProfile, membership, chatSettings} m = case agentMsg of + processGroupMessage agentMsg connEntity conn@Connection {connId, connectionCode} gInfo@GroupInfo {groupId, groupProfile, membership, chatSettings} m = withGroupLock "processGroupMessage" groupId $ case agentMsg of INV (ACR _ cReq) -> withCompletedCommand conn agentMsg $ \CommandData {cmdFunction} -> case cReq of @@ -4102,7 +4100,7 @@ processAgentMessageConn vr user@User {userId} corrId agentConnId agentMessage = processEvent :: MsgEncodingI e => CommandId -> ChatMessage e -> m () processEvent cmdId chatMsg = do (m', conn', msg@RcvMessage {chatMsgEvent = ACME _ event}) <- saveGroupRcvMsg user groupId m conn msgMeta cmdId msgBody chatMsg - updateChatLock "groupMessage" event + -- updateChatLock "groupMessage" event case event of XMsgNew mc -> memberCanSend m' $ newGroupContentMessage gInfo m' mc msg brokerTs False XMsgFileDescr sharedMsgId fileDescr -> memberCanSend m' $ groupMessageFileDescription gInfo m' sharedMsgId fileDescr @@ -4249,7 +4247,7 @@ processAgentMessageConn vr user@User {userId} corrId agentConnId agentMessage = processSndFileConn :: ACommand 'Agent e -> ConnectionEntity -> Connection -> SndFileTransfer -> m () processSndFileConn agentMsg connEntity conn ft@SndFileTransfer {fileId, fileName, fileStatus} = - case agentMsg of + withFileLock "processSndFileConn" fileId $ case agentMsg of -- SMP CONF for SndFileConnection happens for direct file protocol -- when recipient of the file "joins" connection created by the sender CONF confId _pqSupport _ connInfo -> do @@ -4296,7 +4294,7 @@ processAgentMessageConn vr user@User {userId} corrId agentConnId agentMessage = processRcvFileConn :: ACommand 'Agent e -> ConnectionEntity -> Connection -> RcvFileTransfer -> m () processRcvFileConn agentMsg connEntity conn ft@RcvFileTransfer {fileId, fileInvitation = FileInvitation {fileName}, grpMemberId} = - case agentMsg of + withFileLock "processRcvFileConn" fileId $ case agentMsg of INV (ACR _ cReq) -> withCompletedCommand conn agentMsg $ \CommandData {cmdFunction} -> case cReq of @@ -4382,7 +4380,7 @@ processAgentMessageConn vr user@User {userId} corrId agentConnId agentMessage = Nothing -> a processUserContactRequest :: ACommand 'Agent e -> ConnectionEntity -> Connection -> UserContact -> m () - processUserContactRequest agentMsg connEntity conn UserContact {userContactLinkId} = case agentMsg of + processUserContactRequest agentMsg connEntity conn UserContact {userContactLinkId} = withFileLock "processUserContactRequest" userContactLinkId $ case agentMsg of REQ invId pqSupport _ connInfo -> do ChatMessage {chatVRange, chatMsgEvent} <- parseChatMessage conn connInfo case chatMsgEvent of @@ -4443,12 +4441,12 @@ processAgentMessageConn vr user@User {userId} corrId agentConnId agentMessage = toView $ CRConnectionDisabled connEntity _ -> pure () - updateChatLock :: MsgEncodingI enc => String -> ChatMsgEvent enc -> m () - updateChatLock name event = do - l <- asks chatLock - atomically $ tryReadTMVar l >>= mapM_ (swapTMVar l . (<> s)) - where - s = " " <> name <> "=" <> B.unpack (strEncode $ toCMEventTag event) + -- updateChatLock :: MsgEncodingI enc => String -> ChatMsgEvent enc -> m () + -- updateChatLock name event = do + -- l <- asks chatLock + -- atomically $ tryReadTMVar l >>= mapM_ (swapTMVar l . (<> s)) + -- where + -- s = " " <> name <> "=" <> B.unpack (strEncode $ toCMEventTag event) withCompletedCommand :: forall e. AEntityI e => Connection -> ACommand 'Agent e -> (CommandData -> m ()) -> m () withCompletedCommand Connection {connId} agentMsg action = do