more locks

This commit is contained in:
spaced4ndy
2024-03-27 20:58:37 +04:00
parent 1cd2972b77
commit 2a1067e15b
+26 -28
View File
@@ -3032,24 +3032,23 @@ deleteGroupLink_ user gInfo conn = do
deleteAgentConnectionAsync user $ aConnId conn
withStore' $ \db -> deleteGroupLink db user gInfo
agentSubscriber :: forall m. (MonadUnliftIO m, MonadReader ChatController m) => m ()
agentSubscriber :: forall m. ChatMonad' m => m ()
agentSubscriber = do
q <- asks $ subQ . smpAgent
l <- asks chatLock
forever $ atomically (readTBQueue q) >>= void . process l
forever $ atomically (readTBQueue q) >>= void . process
where
process :: Lock -> (ACorrId, EntityId, APartyCmd 'Agent) -> m (Either ChatError ())
process l (corrId, entId, APC e msg) = run $ case e of
process :: (ACorrId, EntityId, APartyCmd 'Agent) -> m (Either ChatError ())
process (corrId, entId, APC e msg) = run $ case e of
SAENone -> processAgentMessageNoConn msg
SAEConn -> processAgentMessage corrId entId msg
SAERcvFile -> processAgentMsgRcvFile corrId entId msg
SAESndFile -> processAgentMsgSndFile corrId entId msg
where
run action = do
let name = "agentSubscriber entity=" <> show e <> " entId=" <> str entId <> " msg=" <> str (aCommandTag msg)
withLock l name $ runExceptT $ action `catchChatError` (toView . CRChatError Nothing)
str :: StrEncoding a => a -> String
str = B.unpack . strEncode
-- str :: StrEncoding a => a -> String
-- str = B.unpack . strEncode
run action =
-- let name = "agentSubscriber entity=" <> show e <> " entId=" <> str entId <> " msg=" <> str (aCommandTag msg)
runExceptT $ action `catchChatError` (toView . CRChatError Nothing)
type AgentBatchSubscribe m = AgentClient -> [ConnId] -> ExceptT AgentErrorType m (Map ConnId (Either AgentErrorType ()))
@@ -3197,8 +3196,7 @@ subscribeUserConnections vr onlyNeeded agentBatchSubscribe user = do
forM_ err_ $ toView . CRSndFileSubError user ft
void . forkIO $ do
threadDelay 1000000
l <- asks chatLock
when (fileStatus == FSConnected) . unlessM (isFileActive fileId sndFiles) . withLock l "subscribe sendFileChunk" $
when (fileStatus == FSConnected) . unlessM (isFileActive fileId sndFiles) . withChatLock "subscribe sendFileChunk" $
sendFileChunk user ft
rcvFileSubsToView :: Map ConnId (Either AgentErrorType ()) -> Map ConnId RcvFileTransfer -> m ()
rcvFileSubsToView rs = mapM_ (toView . uncurry (CRRcvFileSubError user)) . filterErrors . resultsFor rs
@@ -3417,7 +3415,7 @@ processAgentMsgSndFile _corrId aFileId msg =
fileId <- getXFTPSndFileDBId db user $ AgentSndFileId aFileId
getSndFileTransfer db user fileId
vr <- chatVersionRange
unless cancelled $ case msg of
unless cancelled $ withFileLock "processAgentMsgSndFile" fileId $ case msg of
SFPROG sndProgress sndTotal -> do
let status = CIFSSndTransfer {sndProgress, sndTotal}
ci <- withStore $ \db -> do
@@ -3539,7 +3537,7 @@ processAgentMsgRcvFile _corrId aFileId msg =
fileId <- getXFTPRcvFileDBId db $ AgentRcvFileId aFileId
getRcvFileTransfer db user fileId
vr <- chatVersionRange
unless (rcvFileCompleteOrCancelled ft) $ case msg of
unless (rcvFileCompleteOrCancelled ft) $ withFileLock "processAgentMsgRcvFile" fileId $ case msg of
RFPROG rcvProgress rcvTotal -> do
let status = CIFSRcvTransfer {rcvProgress, rcvTotal}
ci <- withStore $ \db -> do
@@ -3625,7 +3623,7 @@ processAgentMessageConn vr user@User {userId} corrId agentConnId agentMessage =
processDirectMessage :: ACommand 'Agent e -> ConnectionEntity -> Connection -> Maybe Contact -> m ()
processDirectMessage agentMsg connEntity conn@Connection {connId, connChatVersion, peerChatVRange, viaUserContactLink, customUserProfileId, connectionCode} = \case
Nothing -> case agentMsg of
Nothing -> withEntityLock "processDirectMessage conn" (CLConnection connId) $ case agentMsg of
CONF confId pqSupport _ connInfo -> do
conn' <- processCONFpqSupport conn pqSupport
-- [incognito] send saved profile
@@ -3662,7 +3660,7 @@ processAgentMessageConn vr user@User {userId} corrId agentConnId agentMessage =
when (corrId /= "") $ withCompletedCommand conn agentMsg $ \_cmdData -> pure ()
-- TODO add debugging output
_ -> pure ()
Just ct@Contact {contactId} -> case agentMsg of
Just ct@Contact {contactId} -> withContactLock "processDirectMessage contact" contactId $ case agentMsg of
INV (ACR _ cReq) ->
-- [async agent commands] XGrpMemIntro continuation on receiving INV
withCompletedCommand conn agentMsg $ \_ ->
@@ -3682,7 +3680,7 @@ processAgentMessageConn vr user@User {userId} corrId agentConnId agentMessage =
(conn'', msg@RcvMessage {chatMsgEvent = ACME _ event}) <- saveDirectRcvMSG conn' msgMeta cmdId msgBody
let ct'' = ct' {activeConn = Just conn''} :: Contact
assertDirectAllowed user MDRcv ct'' $ toCMEventTag event
updateChatLock "direct message" event
-- updateChatLock "direct message" event
case event of
XMsgNew mc -> newContentMessage ct'' mc msg msgMeta
XMsgFileDescr sharedMsgId fileDescr -> messageFileDescription ct'' sharedMsgId fileDescr
@@ -3834,7 +3832,7 @@ processAgentMessageConn vr user@User {userId} corrId agentConnId agentMessage =
_ -> pure ()
processGroupMessage :: ACommand 'Agent e -> ConnectionEntity -> Connection -> GroupInfo -> GroupMember -> m ()
processGroupMessage agentMsg connEntity conn@Connection {connId, connectionCode} gInfo@GroupInfo {groupId, groupProfile, membership, chatSettings} m = case agentMsg of
processGroupMessage agentMsg connEntity conn@Connection {connId, connectionCode} gInfo@GroupInfo {groupId, groupProfile, membership, chatSettings} m = withGroupLock "processGroupMessage" groupId $ case agentMsg of
INV (ACR _ cReq) ->
withCompletedCommand conn agentMsg $ \CommandData {cmdFunction} ->
case cReq of
@@ -4102,7 +4100,7 @@ processAgentMessageConn vr user@User {userId} corrId agentConnId agentMessage =
processEvent :: MsgEncodingI e => CommandId -> ChatMessage e -> m ()
processEvent cmdId chatMsg = do
(m', conn', msg@RcvMessage {chatMsgEvent = ACME _ event}) <- saveGroupRcvMsg user groupId m conn msgMeta cmdId msgBody chatMsg
updateChatLock "groupMessage" event
-- updateChatLock "groupMessage" event
case event of
XMsgNew mc -> memberCanSend m' $ newGroupContentMessage gInfo m' mc msg brokerTs False
XMsgFileDescr sharedMsgId fileDescr -> memberCanSend m' $ groupMessageFileDescription gInfo m' sharedMsgId fileDescr
@@ -4249,7 +4247,7 @@ processAgentMessageConn vr user@User {userId} corrId agentConnId agentMessage =
processSndFileConn :: ACommand 'Agent e -> ConnectionEntity -> Connection -> SndFileTransfer -> m ()
processSndFileConn agentMsg connEntity conn ft@SndFileTransfer {fileId, fileName, fileStatus} =
case agentMsg of
withFileLock "processSndFileConn" fileId $ case agentMsg of
-- SMP CONF for SndFileConnection happens for direct file protocol
-- when recipient of the file "joins" connection created by the sender
CONF confId _pqSupport _ connInfo -> do
@@ -4296,7 +4294,7 @@ processAgentMessageConn vr user@User {userId} corrId agentConnId agentMessage =
processRcvFileConn :: ACommand 'Agent e -> ConnectionEntity -> Connection -> RcvFileTransfer -> m ()
processRcvFileConn agentMsg connEntity conn ft@RcvFileTransfer {fileId, fileInvitation = FileInvitation {fileName}, grpMemberId} =
case agentMsg of
withFileLock "processRcvFileConn" fileId $ case agentMsg of
INV (ACR _ cReq) ->
withCompletedCommand conn agentMsg $ \CommandData {cmdFunction} ->
case cReq of
@@ -4382,7 +4380,7 @@ processAgentMessageConn vr user@User {userId} corrId agentConnId agentMessage =
Nothing -> a
processUserContactRequest :: ACommand 'Agent e -> ConnectionEntity -> Connection -> UserContact -> m ()
processUserContactRequest agentMsg connEntity conn UserContact {userContactLinkId} = case agentMsg of
processUserContactRequest agentMsg connEntity conn UserContact {userContactLinkId} = withFileLock "processUserContactRequest" userContactLinkId $ case agentMsg of
REQ invId pqSupport _ connInfo -> do
ChatMessage {chatVRange, chatMsgEvent} <- parseChatMessage conn connInfo
case chatMsgEvent of
@@ -4443,12 +4441,12 @@ processAgentMessageConn vr user@User {userId} corrId agentConnId agentMessage =
toView $ CRConnectionDisabled connEntity
_ -> pure ()
updateChatLock :: MsgEncodingI enc => String -> ChatMsgEvent enc -> m ()
updateChatLock name event = do
l <- asks chatLock
atomically $ tryReadTMVar l >>= mapM_ (swapTMVar l . (<> s))
where
s = " " <> name <> "=" <> B.unpack (strEncode $ toCMEventTag event)
-- updateChatLock :: MsgEncodingI enc => String -> ChatMsgEvent enc -> m ()
-- updateChatLock name event = do
-- l <- asks chatLock
-- atomically $ tryReadTMVar l >>= mapM_ (swapTMVar l . (<> s))
-- where
-- s = " " <> name <> "=" <> B.unpack (strEncode $ toCMEventTag event)
withCompletedCommand :: forall e. AEntityI e => Connection -> ACommand 'Agent e -> (CommandData -> m ()) -> m ()
withCompletedCommand Connection {connId} agentMsg action = do