diff --git a/src/Simplex/Chat.hs b/src/Simplex/Chat.hs index 7b82d997cd..410d9aa648 100644 --- a/src/Simplex/Chat.hs +++ b/src/Simplex/Chat.hs @@ -227,6 +227,7 @@ newChatController inputQ <- newTBQueueIO tbqSize outputQ <- newTBQueueIO tbqSize connNetworkStatuses <- atomically TM.empty + agentConnStatuses <- atomically TM.empty subscriptionMode <- newTVarIO SMSubscribe chatLock <- newEmptyTMVarIO entityLocks <- atomically TM.empty @@ -263,6 +264,7 @@ newChatController inputQ, outputQ, connNetworkStatuses, + agentConnStatuses, subscriptionMode, chatLock, entityLocks, @@ -2147,6 +2149,14 @@ processChatCommand' vr = \case chatMigrations <- map upMigration <$> withStore' (Migrations.getCurrent . DB.conn) agentMigrations <- withAgent getAgentMigrations pure $ CRVersionInfo {versionInfo, chatMigrations, agentMigrations} + DebugAcks -> lift $ do + acs <- mapM readTVarIO =<< readTVarIO =<< asks agentConnStatuses + liftIO $ print acs + -- acs' <- forM (M.toList acs) $ \(acId, agentConnStatus) -> do + -- debugAckKey <- error "TODO: resolve connId into DebugAckKey" + -- let da = error "TODO: DebugAck {}" + -- pure (debugAckKey, da) + pure $ CRDebugAcks mempty -- (M.fromList acs') DebugLocks -> lift $ do chatLockName <- atomically . tryReadTMVar =<< asks chatLock chatEntityLocks <- getLocks =<< asks entityLocks @@ -3528,18 +3538,43 @@ expireChatItems user@User {userId} ttl sync = do forM_ membersToDelete $ \m -> withStore' $ \db -> deleteGroupMember db user m processAgentMessage :: ACorrId -> ConnId -> ACommand 'Agent 'AEConn -> CM () -processAgentMessage _ connId (DEL_RCVQ srv qId err_) = - toView $ CRAgentRcvQueueDeleted (AgentConnId connId) srv (AgentQueueId qId) err_ -processAgentMessage _ connId DEL_CONN = - toView $ CRAgentConnDeleted (AgentConnId connId) +processAgentMessage _ connId (DEL_RCVQ srv qId err_) = do + let acId = AgentConnId connId + asks agentConnStatuses >>= atomically . TM.delete acId + toView $ CRAgentRcvQueueDeleted acId srv (AgentQueueId qId) err_ +processAgentMessage _ connId DEL_CONN = do + let acId = AgentConnId connId + asks agentConnStatuses >>= atomically . TM.delete acId + toView $ CRAgentConnDeleted acId processAgentMessage corrId connId msg = do - lockEntity <- critical (withStore (`getChatLockEntity` AgentConnId connId)) + let acId = AgentConnId connId + lift $ trackAgentConn acId msg + lockEntity <- critical (withStore (`getChatLockEntity` acId)) withEntityLock "processAgentMessage" lockEntity $ do vr <- chatVersionRange -- getUserByAConnId never throws logical errors, only SEDBBusyError can be thrown here - critical (withStore' (`getUserByAConnId` AgentConnId connId)) >>= \case + critical (withStore' (`getUserByAConnId` acId)) >>= \case Just user -> processAgentMessageConn vr user corrId connId msg `catchChatError` (toView . CRChatError (Just user)) - _ -> throwChatError $ CENoConnectionUser (AgentConnId connId) + _ -> throwChatError $ CENoConnectionUser acId + +trackAgentConn :: AgentConnId -> ACommand 'Agent 'AEConn -> CM' () +trackAgentConn acId msg = do + now <- liftIO getCurrentTime + asks agentConnStatuses >>= atomically . TM.alterF (updateConn now) acId + where + updateConn now = \case + Nothing -> Just <$> newTVar (status now Nothing Nothing Nothing) + Just v -> Just v <$ modifyTVar' v (\AgentConnStatus {lastMsg, ackSent, okRcvd} -> status now lastMsg ackSent okRcvd) + status now lastMsg ackSent okRcvd = AgentConnStatus + { lastCmd = now, + lastCmdTag, + lastMsg = if isMSG then Just now else lastMsg, + ackSent, + okRcvd = if isOK then Just now else okRcvd + } + lastCmdTag = aCommandTag msg + isMSG = lastCmdTag == MSG_ + isOK = lastCmdTag == OK_ -- CRITICAL error will be shown to the user as alert with restart button in Android/desktop apps. -- SEDBBusyError will only be thrown on IO exceptions or SQLError during DB queries, @@ -4643,7 +4678,13 @@ processAgentMessageConn vr user@User {userId} corrId agentConnId agentMessage = Left e -> ackMsg msgMeta Nothing >> throwError e where ackMsg :: MsgMeta -> Maybe MsgReceiptInfo -> CM () - ackMsg MsgMeta {recipient = (msgId, _)} rcpt = withAgent $ \a -> ackMessageAsync a "" cId msgId rcpt + ackMsg MsgMeta {recipient = (msgId, _)} rcpt = do + withAgent $ \a -> ackMessageAsync a "" cId msgId rcpt + acs <- asks agentConnStatuses + liftIO $ do + let acId = AgentConnId cId + now <- getCurrentTime + atomically $ TM.lookup acId acs >>= mapM_ (\v -> modifyTVar' v $ \cs -> cs {ackSent = Just now}) sentMsgDeliveryEvent :: Connection -> AgentMsgId -> CM () sentMsgDeliveryEvent Connection {connId} msgId = @@ -7271,6 +7312,7 @@ chatCommandP = "/_download " *> (APIDownloadStandaloneFile <$> A.decimal <* A.space <*> strP_ <*> cryptoFileP), ("/quit" <|> "/q" <|> "/exit") $> QuitChat, ("/version" <|> "/v") $> ShowVersion, + "/debug acks" $> DebugAcks, "/debug locks" $> DebugLocks, "/debug event " *> (DebugEvent <$> jsonP), "/get stats" $> GetAgentStats, diff --git a/src/Simplex/Chat/Controller.hs b/src/Simplex/Chat/Controller.hs index 29566634f4..8dfa20a695 100644 --- a/src/Simplex/Chat/Controller.hs +++ b/src/Simplex/Chat/Controller.hs @@ -207,6 +207,7 @@ data ChatController = ChatController inputQ :: TBQueue String, outputQ :: TBQueue (Maybe CorrId, Maybe RemoteHostId, ChatResponse), connNetworkStatuses :: TMap AgentConnId NetworkStatus, + agentConnStatuses :: TMap AgentConnId (TVar AgentConnStatus), subscriptionMode :: TVar SubscriptionMode, chatLock :: Lock, entityLocks :: TMap ChatLockEntity Lock, @@ -233,6 +234,15 @@ data ChatController = ChatController contactMergeEnabled :: TVar Bool } +data AgentConnStatus = AgentConnStatus + { lastCmd :: UTCTime, + lastCmdTag :: ACommandTag 'Agent 'AEConn, + lastMsg :: Maybe UTCTime, -- no message yet / got an MSG to ack + ackSent :: Maybe UTCTime, + okRcvd :: Maybe UTCTime -- ACK delivered, resulting in OK or MSG + } + deriving (Show) + data HelpSection = HSMain | HSFiles | HSGroups | HSContacts | HSMyAddress | HSIncognito | HSMarkdown | HSMessages | HSRemote | HSSettings | HSDatabase deriving (Show) @@ -488,6 +498,7 @@ data ChatCommand | APIStandaloneFileInfo FileDescriptionURI | QuitChat | ShowVersion + | DebugAcks | DebugLocks | DebugEvent ChatResponse | GetAgentStats @@ -735,6 +746,7 @@ data ChatResponse | CRContactPQEnabled {user :: User, contact :: Contact, pqEnabled :: PQEncryption} | CRSQLResult {rows :: [Text]} | CRSlowSQLQueries {chatQueries :: [SlowSQLQuery], agentQueries :: [SlowSQLQuery]} + | CRDebugAcks {debugAcks :: Map DebugAckKey DebugAck} | CRDebugLocks {chatLockName :: Maybe String, chatEntityLocks :: Map String String, agentLocks :: AgentLocks} | CRAgentStats {agentStats :: [[String]]} | CRAgentWorkersDetails {agentWorkersDetails :: AgentWorkersDetails} @@ -755,6 +767,31 @@ data ChatResponse | CRCustomChatResponse {user_ :: Maybe User, response :: Text} deriving (Show) +-- entity marker + id: @34 +-- using names would make a dump unshareable +type DebugAckKey = Text + +data DebugAck = DebugAck + { -- from agentConnStatuses + lastCmd :: Maybe (Text, UTCTime), -- was there ANY command result delivered here? + lastMsg :: Maybe UTCTime, -- if yes, the ACK should happen + lastAck :: Maybe UTCTime, -- if sent, the OK should happen or a new MSG + lasOK :: Maybe UTCTime, -- server got ACK, waiting for new messages + -- from getAgentSubscriptions, via rId + inActive :: Bool, -- should the delivery work right now? + inPending :: Bool, -- is there a temporary error? + -- from some receive queue + host :: TransportHost, -- what's the server for this connection? + hasSMPClient :: Bool, -- is there an active client for it? + hasSubWorker :: Bool, -- a session was recently restarted and tries to resubscribe + hasDeliveryWorker :: Bool, -- connection's delivery worker is active, double-take on session status + -- from Connection + connStatus_ :: ConnStatus, -- does the protocol permits delivery + connAuthErrors :: (Int, Bool), -- number of AUTH errors before connection gets disabled + createdAt :: UTCTime + } + deriving Show + -- some of these can only be used as command responses allowRemoteEvent :: ChatResponse -> Bool allowRemoteEvent = \case @@ -1456,6 +1493,8 @@ $(JQ.deriveJSON (sumTypeJSON $ dropPrefix "RCSR") ''RemoteCtrlStopReason) $(JQ.deriveJSON (sumTypeJSON $ dropPrefix "RHSR") ''RemoteHostStopReason) +$(JQ.deriveJSON defaultJSON ''DebugAck) + $(JQ.deriveJSON (sumTypeJSON $ dropPrefix "CR") ''ChatResponse) $(JQ.deriveFromJSON defaultJSON ''ArchiveConfig) diff --git a/src/Simplex/Chat/View.hs b/src/Simplex/Chat/View.hs index a10acc884a..8a8e68de96 100644 --- a/src/Simplex/Chat/View.hs +++ b/src/Simplex/Chat/View.hs @@ -351,6 +351,7 @@ responseToView hu@(currentRH, user_) ChatConfig {logLevel, showReactions, showRe <> (" :: avg: " <> sShow timeAvg <> " ms") <> (" :: " <> plain (T.unwords $ T.lines query)) in ("Chat queries" : map viewQuery chatQueries) <> [""] <> ("Agent queries" : map viewQuery agentQueries) + CRDebugAcks {debugAcks} -> [plain $ LB.unpack (J.encode debugAcks)] CRDebugLocks {chatLockName, chatEntityLocks, agentLocks} -> [ maybe "no chat lock" (("chat lock: " <>) . plain) chatLockName, plain $ "chat entity locks: " <> LB.unpack (J.encode chatEntityLocks),