more tracking

This commit is contained in:
Evgeny Poberezkin
2024-04-28 18:27:43 +01:00
parent 7d4d0dd2c0
commit 83ab8d6ef2
2 changed files with 33 additions and 18 deletions
+31 -18
View File
@@ -3589,7 +3589,7 @@ processAgentMessage _ connId DEL_CONN =
processAgentMessage corrId connId msg = do
let acId = AgentConnId connId
acTag = aCommandTag msg
when (acTag == MSG_ || acTag == RCVD_) . lift $ trackNewDelivery acId (acTag == MSG_)
when (acTag == MSG_ || acTag == RCVD_) . lift $ trackNewDelivery acId msg
lockEntity <- critical (withStore (`getChatLockEntity` acId))
withEntityLock "processAgentMessage" lockEntity $ do
vr <- chatVersionRange
@@ -3599,14 +3599,17 @@ processAgentMessage corrId connId msg = do
_ -> throwChatError $ CENoConnectionUser acId
-- TODO: clean up deliveries
trackNewDelivery :: AgentConnId -> Bool -> CM' ()
trackNewDelivery acId isMSG = do
trackNewDelivery :: AgentConnId -> ACommand 'Agent 'AEConn -> CM' ()
trackNewDelivery acId msg = do
now <- liftIO getCurrentTime
asks agentDeliveryStatuses >>= atomically . TM.alterF (updateConn now) acId
where
(isMSG, msgBodyPfx) = case msg of
MSG _ _ msgBody -> (True, T.take 1000 $ safeDecodeUtf8 msgBody)
_ -> (False, "")
updateConn lastCmd = \case
Nothing -> Just <$> newTVar AgentDeliveryStatus {lastCmd, isMSG, connId = Nothing, eventTag = Nothing, ackSent = Nothing, pendingAcks = M.empty}
Just v -> Just v <$ modifyTVar' v (\AgentDeliveryStatus {pendingAcks} -> AgentDeliveryStatus {lastCmd, isMSG, connId = Nothing, eventTag = Nothing, ackSent = Nothing, pendingAcks = M.filter not pendingAcks})
Nothing -> Just <$> newTVar AgentDeliveryStatus {lastCmd, tracking = "create", isMSG, connId = Nothing, msgBodyPfx, eventTag = Nothing, ackSent = Nothing, pendingAcks = M.empty}
Just v -> Just v <$ modifyTVar' v (\AgentDeliveryStatus {pendingAcks} -> AgentDeliveryStatus {lastCmd, tracking = "create", isMSG, connId = Nothing, msgBodyPfx, eventTag = Nothing, ackSent = Nothing, pendingAcks = M.filter not pendingAcks})
-- CRITICAL error will be shown to the user as alert with restart button in Android/desktop apps.
-- SEDBBusyError will only be thrown on IO exceptions or SQLError during DB queries,
@@ -3845,7 +3848,7 @@ processAgentMessageConn vr user@User {userId} corrId agentConnId agentMessage =
pure $ updateEntityConnStatus acEntity connStatus
Nothing -> pure acEntity
storeDeliveryConn :: Connection -> CM ()
storeDeliveryConn Connection {connId} = lift $ agentDeliveryStatus (AgentConnId agentConnId) $ \ad -> ad {connId = Just connId}
storeDeliveryConn Connection {connId} = lift $ agentDeliveryStatus (AgentConnId agentConnId) "storeDeliveryConn" $ \ad -> ad {connId = Just connId}
agentMsgConnStatus :: ACommand 'Agent e -> Maybe ConnStatus
agentMsgConnStatus = \case
CONF {} -> Just ConnRequested
@@ -3924,7 +3927,7 @@ processAgentMessageConn vr user@User {userId} corrId agentConnId agentMessage =
(ct', conn') <- updateContactPQRcv user ct conn pqEncryption
checkIntegrityCreateItem (CDDirectRcv ct') msgMeta `catchChatError` \_ -> pure ()
(conn'', msg@RcvMessage {chatMsgEvent = ACME _ event}) <- saveDirectRcvMSG conn' msgMeta msgBody
lift $ agentDeliveryStatus (AgentConnId agentConnId) $ \ad -> ad {eventTag = Just $! tshow (toCMEventTag event)}
lift $ agentDeliveryStatus (AgentConnId agentConnId) "eventTag" $ \ad -> ad {eventTag = Just $! tshow (toCMEventTag event)}
let ct'' = ct' {activeConn = Just conn''} :: Contact
assertDirectAllowed user MDRcv ct'' $ toCMEventTag event
case event of
@@ -4328,11 +4331,17 @@ processAgentMessageConn vr user@User {userId} corrId agentConnId agentMessage =
_ -> messageWarning "sendXGrpMemCon: member category GCPreMember or GCPostMember is expected"
MSG msgMeta _msgFlags msgBody -> do
withAckMessage agentConnId msgMeta True $ do
lift $ agentDeliveryStatus (AgentConnId agentConnId) "group MSG" id
checkIntegrityCreateItem (CDGroupRcv gInfo m) msgMeta `catchChatError` \_ -> pure ()
lift $ agentDeliveryStatus (AgentConnId agentConnId) "after checkIntegrityCreateItem" id
forM_ aChatMsgs $ \case
Right (ACMsg _ chatMsg) ->
Right (ACMsg _ chatMsg) -> do
lift $ agentDeliveryStatus (AgentConnId agentConnId) "has event" id
processEvent chatMsg `catchChatError` \e -> toView $ CRChatError (Just user) e
Left e -> toView $ CRChatError (Just user) (ChatError . CEException $ "error parsing chat message: " <> e)
Left e -> do
lift $ agentDeliveryStatus (AgentConnId agentConnId) "has error" id
toView $ CRChatError (Just user) (ChatError . CEException $ "error parsing chat message: " <> e)
lift $ agentDeliveryStatus (AgentConnId agentConnId) "after forM_" id
checkSendRcpt $ rights aChatMsgs
-- currently only a single message is forwarded
let GroupMember {memberRole = membershipMemRole} = membership
@@ -4344,8 +4353,9 @@ processAgentMessageConn vr user@User {userId} corrId agentConnId agentMessage =
brokerTs = metaBrokerTs msgMeta
processEvent :: MsgEncodingI e => ChatMessage e -> CM ()
processEvent chatMsg = do
lift $ agentDeliveryStatus (AgentConnId agentConnId) "processEvent" id
(m', conn', msg@RcvMessage {chatMsgEvent = ACME _ event}) <- saveGroupRcvMsg user groupId m conn msgMeta msgBody chatMsg
lift $ agentDeliveryStatus (AgentConnId agentConnId) $ \ad -> ad {eventTag = Just $! tshow (toCMEventTag event)}
lift $ agentDeliveryStatus (AgentConnId agentConnId) "eventTag in processEvent" $ \ad -> ad {eventTag = Just $! tshow (toCMEventTag event)}
case event of
XMsgNew mc -> memberCanSend m' $ newGroupContentMessage gInfo m' mc msg brokerTs False
XMsgFileDescr sharedMsgId fileDescr -> memberCanSend m' $ groupMessageFileDescription gInfo m' sharedMsgId fileDescr
@@ -4686,7 +4696,7 @@ processAgentMessageConn vr user@User {userId} corrId agentConnId agentMessage =
let agentMsgTag = APCT (sAEntity @e) $ aCommandTag agentMsg
pending_ <- mapM readTVarIO =<< atomically . TM.lookup acId =<< asks agentDeliveryStatuses
if agentMsgTag == APCT SAEConn OK_ && corrId /= "" && maybe False (M.member ackKey . pendingAcks) pending_
then lift $ agentDeliveryStatus acId $ \ad@AgentDeliveryStatus {pendingAcks} -> ad {pendingAcks = M.adjust (const True) ackKey pendingAcks}
then lift $ agentDeliveryStatus acId "withCompletedCommand" $ \ad@AgentDeliveryStatus {pendingAcks} -> ad {pendingAcks = M.adjust (const True) ackKey pendingAcks}
else do
cmdData_ <- withStore' $ \db -> getCommandDataByCorrId db user corrId
case cmdData_ of
@@ -4720,16 +4730,19 @@ processAgentMessageConn vr user@User {userId} corrId agentConnId agentMessage =
Right withRcpt -> ackMsg msgMeta $ if withRcpt then Just "" else Nothing
-- If showCritical is True, then these errors don't result in ACK and show user visible alert
-- This prevents losing the message that failed to be processed.
Left (ChatErrorStore SEDBBusyError {message}) | showCritical -> throwError $ ChatErrorAgent (CRITICAL True message) Nothing
Left (ChatErrorStore SEDBBusyError {message}) | showCritical -> do
lift $ agentDeliveryStatus (AgentConnId agentConnId) "SEDBBusyError" id
throwError $ ChatErrorAgent (CRITICAL True message) Nothing
Left e -> ackMsg msgMeta Nothing >> throwError e
where
ackMsg :: MsgMeta -> Maybe MsgReceiptInfo -> CM ()
ackMsg MsgMeta {recipient = (msgId, _)} rcpt = do
ackCorrId <- drgRandomBytes 24
lift $ agentDeliveryStatus (AgentConnId agentConnId) "ackMsg" id
withAgent $ \a -> ackMessageAsync a ackCorrId cId msgId rcpt
now <- liftIO getCurrentTime
let ackKey = decodeLatin1 $ strEncode ackCorrId
lift . agentDeliveryStatus (AgentConnId agentConnId) $ \ad@AgentDeliveryStatus {pendingAcks} ->
lift . agentDeliveryStatus (AgentConnId agentConnId) "after ackMessageAsync" $ \ad@AgentDeliveryStatus {pendingAcks} ->
ad {ackSent = Just (now, ackKey), pendingAcks = M.insert ackKey False pendingAcks}
sentMsgDeliveryEvent :: Connection -> AgentMsgId -> CM ()
@@ -6029,7 +6042,7 @@ processAgentMessageConn vr user@User {userId} corrId agentConnId agentMessage =
processForwardedMsg author chatMsg = do
let body = LB.toStrict $ J.encode msg
rcvMsg@RcvMessage {chatMsgEvent = ACME _ event} <- saveGroupFwdRcvMsg user groupId m author body chatMsg
lift $ agentDeliveryStatus (AgentConnId agentConnId) $ \ad -> ad {eventTag = Just $! tshow (toCMEventTag event)}
lift $ agentDeliveryStatus (AgentConnId agentConnId) "eventTag in processForwardedMsg" $ \ad -> ad {eventTag = Just $! tshow (toCMEventTag event)}
case event of
XMsgNew mc -> memberCanSend author $ newGroupContentMessage gInfo author mc rcvMsg msgTs True
XMsgFileDescr sharedMsgId fileDescr -> memberCanSend author $ groupMessageFileDescription gInfo author sharedMsgId fileDescr
@@ -6657,7 +6670,7 @@ saveDirectRcvMSG :: Connection -> MsgMeta -> MsgBody -> CM (Connection, RcvMessa
saveDirectRcvMSG conn@Connection {connId, agentConnId} agentMsgMeta msgBody =
case parseChatMessages msgBody of
[Right (ACMsg _ ChatMessage {chatVRange, msgId = sharedMsgId_, chatMsgEvent})] -> do
lift $ agentDeliveryStatus agentConnId $ \ad -> ad {eventTag = Just $! tshow (toCMEventTag chatMsgEvent)}
lift $ agentDeliveryStatus agentConnId "eventTag in saveDirectRcvMSG" $ \ad -> ad {eventTag = Just $! tshow (toCMEventTag chatMsgEvent)}
conn' <- updatePeerChatVRange conn chatVRange
let agentMsgId = fst $ recipient agentMsgMeta
newMsg = NewRcvMessage {chatMsgEvent, msgBody}
@@ -7571,7 +7584,7 @@ xftpSndFileRedirect user ftId vfd = do
dummyFileDescr :: FileDescr
dummyFileDescr = FileDescr {fileDescrText = "", fileDescrPartNo = 0, fileDescrComplete = False}
agentDeliveryStatus :: AgentConnId -> (AgentDeliveryStatus -> AgentDeliveryStatus) -> CM' ()
agentDeliveryStatus acId f = do
agentDeliveryStatus :: AgentConnId -> Text -> (AgentDeliveryStatus -> AgentDeliveryStatus) -> CM' ()
agentDeliveryStatus acId tracking f = do
ads <- asks agentDeliveryStatuses
atomically $ TM.lookup acId ads >>= mapM_ (`modifyTVar'` f)
atomically $ TM.lookup acId ads >>= mapM_ (`modifyTVar'` \ds -> (f ds) {tracking})
+2
View File
@@ -236,8 +236,10 @@ data ChatController = ChatController
data AgentDeliveryStatus = AgentDeliveryStatus
{ lastCmd :: UTCTime,
tracking :: Text,
isMSG :: Bool, -- False for RCVD
connId :: Maybe Int64, -- chat connection ID
msgBodyPfx :: Text,
eventTag :: Maybe Text, -- tshow of ACMEventTag (for JSON instances)
ackSent :: Maybe (UTCTime, Text), -- strEncode of random CorrId
pendingAcks :: Map Text Bool