mirror of
https://github.com/simplex-chat/simplex-chat.git
synced 2026-04-27 19:26:12 +00:00
chat: delivery troubleshooting helper
This commit is contained in:
+50
-8
@@ -227,6 +227,7 @@ newChatController
|
||||
inputQ <- newTBQueueIO tbqSize
|
||||
outputQ <- newTBQueueIO tbqSize
|
||||
connNetworkStatuses <- atomically TM.empty
|
||||
agentConnStatuses <- atomically TM.empty
|
||||
subscriptionMode <- newTVarIO SMSubscribe
|
||||
chatLock <- newEmptyTMVarIO
|
||||
entityLocks <- atomically TM.empty
|
||||
@@ -263,6 +264,7 @@ newChatController
|
||||
inputQ,
|
||||
outputQ,
|
||||
connNetworkStatuses,
|
||||
agentConnStatuses,
|
||||
subscriptionMode,
|
||||
chatLock,
|
||||
entityLocks,
|
||||
@@ -2147,6 +2149,14 @@ processChatCommand' vr = \case
|
||||
chatMigrations <- map upMigration <$> withStore' (Migrations.getCurrent . DB.conn)
|
||||
agentMigrations <- withAgent getAgentMigrations
|
||||
pure $ CRVersionInfo {versionInfo, chatMigrations, agentMigrations}
|
||||
DebugAcks -> lift $ do
|
||||
acs <- mapM readTVarIO =<< readTVarIO =<< asks agentConnStatuses
|
||||
liftIO $ print acs
|
||||
-- acs' <- forM (M.toList acs) $ \(acId, agentConnStatus) -> do
|
||||
-- debugAckKey <- error "TODO: resolve connId into DebugAckKey"
|
||||
-- let da = error "TODO: DebugAck {}"
|
||||
-- pure (debugAckKey, da)
|
||||
pure $ CRDebugAcks mempty -- (M.fromList acs')
|
||||
DebugLocks -> lift $ do
|
||||
chatLockName <- atomically . tryReadTMVar =<< asks chatLock
|
||||
chatEntityLocks <- getLocks =<< asks entityLocks
|
||||
@@ -3528,18 +3538,43 @@ expireChatItems user@User {userId} ttl sync = do
|
||||
forM_ membersToDelete $ \m -> withStore' $ \db -> deleteGroupMember db user m
|
||||
|
||||
processAgentMessage :: ACorrId -> ConnId -> ACommand 'Agent 'AEConn -> CM ()
|
||||
processAgentMessage _ connId (DEL_RCVQ srv qId err_) =
|
||||
toView $ CRAgentRcvQueueDeleted (AgentConnId connId) srv (AgentQueueId qId) err_
|
||||
processAgentMessage _ connId DEL_CONN =
|
||||
toView $ CRAgentConnDeleted (AgentConnId connId)
|
||||
processAgentMessage _ connId (DEL_RCVQ srv qId err_) = do
|
||||
let acId = AgentConnId connId
|
||||
asks agentConnStatuses >>= atomically . TM.delete acId
|
||||
toView $ CRAgentRcvQueueDeleted acId srv (AgentQueueId qId) err_
|
||||
processAgentMessage _ connId DEL_CONN = do
|
||||
let acId = AgentConnId connId
|
||||
asks agentConnStatuses >>= atomically . TM.delete acId
|
||||
toView $ CRAgentConnDeleted acId
|
||||
processAgentMessage corrId connId msg = do
|
||||
lockEntity <- critical (withStore (`getChatLockEntity` AgentConnId connId))
|
||||
let acId = AgentConnId connId
|
||||
lift $ trackAgentConn acId msg
|
||||
lockEntity <- critical (withStore (`getChatLockEntity` acId))
|
||||
withEntityLock "processAgentMessage" lockEntity $ do
|
||||
vr <- chatVersionRange
|
||||
-- getUserByAConnId never throws logical errors, only SEDBBusyError can be thrown here
|
||||
critical (withStore' (`getUserByAConnId` AgentConnId connId)) >>= \case
|
||||
critical (withStore' (`getUserByAConnId` acId)) >>= \case
|
||||
Just user -> processAgentMessageConn vr user corrId connId msg `catchChatError` (toView . CRChatError (Just user))
|
||||
_ -> throwChatError $ CENoConnectionUser (AgentConnId connId)
|
||||
_ -> throwChatError $ CENoConnectionUser acId
|
||||
|
||||
trackAgentConn :: AgentConnId -> ACommand 'Agent 'AEConn -> CM' ()
|
||||
trackAgentConn acId msg = do
|
||||
now <- liftIO getCurrentTime
|
||||
asks agentConnStatuses >>= atomically . TM.alterF (updateConn now) acId
|
||||
where
|
||||
updateConn now = \case
|
||||
Nothing -> Just <$> newTVar (status now Nothing Nothing Nothing)
|
||||
Just v -> Just v <$ modifyTVar' v (\AgentConnStatus {lastMsg, ackSent, okRcvd} -> status now lastMsg ackSent okRcvd)
|
||||
status now lastMsg ackSent okRcvd = AgentConnStatus
|
||||
{ lastCmd = now,
|
||||
lastCmdTag,
|
||||
lastMsg = if isMSG then Just now else lastMsg,
|
||||
ackSent,
|
||||
okRcvd = if isOK then Just now else okRcvd
|
||||
}
|
||||
lastCmdTag = aCommandTag msg
|
||||
isMSG = lastCmdTag == MSG_
|
||||
isOK = lastCmdTag == OK_
|
||||
|
||||
-- CRITICAL error will be shown to the user as alert with restart button in Android/desktop apps.
|
||||
-- SEDBBusyError will only be thrown on IO exceptions or SQLError during DB queries,
|
||||
@@ -4643,7 +4678,13 @@ processAgentMessageConn vr user@User {userId} corrId agentConnId agentMessage =
|
||||
Left e -> ackMsg msgMeta Nothing >> throwError e
|
||||
where
|
||||
ackMsg :: MsgMeta -> Maybe MsgReceiptInfo -> CM ()
|
||||
ackMsg MsgMeta {recipient = (msgId, _)} rcpt = withAgent $ \a -> ackMessageAsync a "" cId msgId rcpt
|
||||
ackMsg MsgMeta {recipient = (msgId, _)} rcpt = do
|
||||
withAgent $ \a -> ackMessageAsync a "" cId msgId rcpt
|
||||
acs <- asks agentConnStatuses
|
||||
liftIO $ do
|
||||
let acId = AgentConnId cId
|
||||
now <- getCurrentTime
|
||||
atomically $ TM.lookup acId acs >>= mapM_ (\v -> modifyTVar' v $ \cs -> cs {ackSent = Just now})
|
||||
|
||||
sentMsgDeliveryEvent :: Connection -> AgentMsgId -> CM ()
|
||||
sentMsgDeliveryEvent Connection {connId} msgId =
|
||||
@@ -7271,6 +7312,7 @@ chatCommandP =
|
||||
"/_download " *> (APIDownloadStandaloneFile <$> A.decimal <* A.space <*> strP_ <*> cryptoFileP),
|
||||
("/quit" <|> "/q" <|> "/exit") $> QuitChat,
|
||||
("/version" <|> "/v") $> ShowVersion,
|
||||
"/debug acks" $> DebugAcks,
|
||||
"/debug locks" $> DebugLocks,
|
||||
"/debug event " *> (DebugEvent <$> jsonP),
|
||||
"/get stats" $> GetAgentStats,
|
||||
|
||||
@@ -207,6 +207,7 @@ data ChatController = ChatController
|
||||
inputQ :: TBQueue String,
|
||||
outputQ :: TBQueue (Maybe CorrId, Maybe RemoteHostId, ChatResponse),
|
||||
connNetworkStatuses :: TMap AgentConnId NetworkStatus,
|
||||
agentConnStatuses :: TMap AgentConnId (TVar AgentConnStatus),
|
||||
subscriptionMode :: TVar SubscriptionMode,
|
||||
chatLock :: Lock,
|
||||
entityLocks :: TMap ChatLockEntity Lock,
|
||||
@@ -233,6 +234,15 @@ data ChatController = ChatController
|
||||
contactMergeEnabled :: TVar Bool
|
||||
}
|
||||
|
||||
data AgentConnStatus = AgentConnStatus
|
||||
{ lastCmd :: UTCTime,
|
||||
lastCmdTag :: ACommandTag 'Agent 'AEConn,
|
||||
lastMsg :: Maybe UTCTime, -- no message yet / got an MSG to ack
|
||||
ackSent :: Maybe UTCTime,
|
||||
okRcvd :: Maybe UTCTime -- ACK delivered, resulting in OK or MSG
|
||||
}
|
||||
deriving (Show)
|
||||
|
||||
data HelpSection = HSMain | HSFiles | HSGroups | HSContacts | HSMyAddress | HSIncognito | HSMarkdown | HSMessages | HSRemote | HSSettings | HSDatabase
|
||||
deriving (Show)
|
||||
|
||||
@@ -488,6 +498,7 @@ data ChatCommand
|
||||
| APIStandaloneFileInfo FileDescriptionURI
|
||||
| QuitChat
|
||||
| ShowVersion
|
||||
| DebugAcks
|
||||
| DebugLocks
|
||||
| DebugEvent ChatResponse
|
||||
| GetAgentStats
|
||||
@@ -735,6 +746,7 @@ data ChatResponse
|
||||
| CRContactPQEnabled {user :: User, contact :: Contact, pqEnabled :: PQEncryption}
|
||||
| CRSQLResult {rows :: [Text]}
|
||||
| CRSlowSQLQueries {chatQueries :: [SlowSQLQuery], agentQueries :: [SlowSQLQuery]}
|
||||
| CRDebugAcks {debugAcks :: Map DebugAckKey DebugAck}
|
||||
| CRDebugLocks {chatLockName :: Maybe String, chatEntityLocks :: Map String String, agentLocks :: AgentLocks}
|
||||
| CRAgentStats {agentStats :: [[String]]}
|
||||
| CRAgentWorkersDetails {agentWorkersDetails :: AgentWorkersDetails}
|
||||
@@ -755,6 +767,31 @@ data ChatResponse
|
||||
| CRCustomChatResponse {user_ :: Maybe User, response :: Text}
|
||||
deriving (Show)
|
||||
|
||||
-- entity marker + id: @34
|
||||
-- using names would make a dump unshareable
|
||||
type DebugAckKey = Text
|
||||
|
||||
data DebugAck = DebugAck
|
||||
{ -- from agentConnStatuses
|
||||
lastCmd :: Maybe (Text, UTCTime), -- was there ANY command result delivered here?
|
||||
lastMsg :: Maybe UTCTime, -- if yes, the ACK should happen
|
||||
lastAck :: Maybe UTCTime, -- if sent, the OK should happen or a new MSG
|
||||
lasOK :: Maybe UTCTime, -- server got ACK, waiting for new messages
|
||||
-- from getAgentSubscriptions, via rId
|
||||
inActive :: Bool, -- should the delivery work right now?
|
||||
inPending :: Bool, -- is there a temporary error?
|
||||
-- from some receive queue
|
||||
host :: TransportHost, -- what's the server for this connection?
|
||||
hasSMPClient :: Bool, -- is there an active client for it?
|
||||
hasSubWorker :: Bool, -- a session was recently restarted and tries to resubscribe
|
||||
hasDeliveryWorker :: Bool, -- connection's delivery worker is active, double-take on session status
|
||||
-- from Connection
|
||||
connStatus_ :: ConnStatus, -- does the protocol permits delivery
|
||||
connAuthErrors :: (Int, Bool), -- number of AUTH errors before connection gets disabled
|
||||
createdAt :: UTCTime
|
||||
}
|
||||
deriving Show
|
||||
|
||||
-- some of these can only be used as command responses
|
||||
allowRemoteEvent :: ChatResponse -> Bool
|
||||
allowRemoteEvent = \case
|
||||
@@ -1456,6 +1493,8 @@ $(JQ.deriveJSON (sumTypeJSON $ dropPrefix "RCSR") ''RemoteCtrlStopReason)
|
||||
|
||||
$(JQ.deriveJSON (sumTypeJSON $ dropPrefix "RHSR") ''RemoteHostStopReason)
|
||||
|
||||
$(JQ.deriveJSON defaultJSON ''DebugAck)
|
||||
|
||||
$(JQ.deriveJSON (sumTypeJSON $ dropPrefix "CR") ''ChatResponse)
|
||||
|
||||
$(JQ.deriveFromJSON defaultJSON ''ArchiveConfig)
|
||||
|
||||
@@ -351,6 +351,7 @@ responseToView hu@(currentRH, user_) ChatConfig {logLevel, showReactions, showRe
|
||||
<> (" :: avg: " <> sShow timeAvg <> " ms")
|
||||
<> (" :: " <> plain (T.unwords $ T.lines query))
|
||||
in ("Chat queries" : map viewQuery chatQueries) <> [""] <> ("Agent queries" : map viewQuery agentQueries)
|
||||
CRDebugAcks {debugAcks} -> [plain $ LB.unpack (J.encode debugAcks)]
|
||||
CRDebugLocks {chatLockName, chatEntityLocks, agentLocks} ->
|
||||
[ maybe "no chat lock" (("chat lock: " <>) . plain) chatLockName,
|
||||
plain $ "chat entity locks: " <> LB.unpack (J.encode chatEntityLocks),
|
||||
|
||||
Reference in New Issue
Block a user