From c8a8e2c297f45b72184858f8ae098c8a55a5e1db Mon Sep 17 00:00:00 2001 From: spaced4ndy <8711996+spaced4ndy@users.noreply.github.com> Date: Thu, 29 Jun 2023 10:00:53 +0400 Subject: [PATCH] differentiate agent INACTIVE error, treat as temporary (#784) --- src/Simplex/Messaging/Agent/Client.hs | 7 +++--- src/Simplex/Messaging/Agent/Protocol.hs | 4 ++++ tests/AgentTests/FunctionalAPITests.hs | 31 +++++++++++++++++++++++++ 3 files changed, 39 insertions(+), 3 deletions(-) diff --git a/src/Simplex/Messaging/Agent/Client.hs b/src/Simplex/Messaging/Agent/Client.hs index 05294aaaa..d4289749b 100644 --- a/src/Simplex/Messaging/Agent/Client.hs +++ b/src/Simplex/Messaging/Agent/Client.hs @@ -399,7 +399,7 @@ instance ProtocolServerClient XFTPErrorType FileResponse where getSMPServerClient :: forall m. AgentMonad m => AgentClient -> SMPTransportSession -> m SMPClient getSMPServerClient c@AgentClient {active, smpClients, msgQ} tSess@(userId, srv, _) = do - unlessM (readTVarIO active) . throwError $ INTERNAL "agent is stopped" + unlessM (readTVarIO active) . throwError $ INACTIVE atomically (getClientVar tSess smpClients) >>= either (newProtocolClient c tSess smpClients connectClient reconnectSMPClient) @@ -467,7 +467,7 @@ reconnectSMPClient c tSess@(_, srv, _) = getNtfServerClient :: forall m. AgentMonad m => AgentClient -> NtfTransportSession -> m NtfClient getNtfServerClient c@AgentClient {active, ntfClients} tSess@(userId, srv, _) = do - unlessM (readTVarIO active) . throwError $ INTERNAL "agent is stopped" + unlessM (readTVarIO active) . throwError $ INACTIVE atomically (getClientVar tSess ntfClients) >>= either (newProtocolClient c tSess ntfClients connectClient $ \_ _ -> pure ()) @@ -487,7 +487,7 @@ getNtfServerClient c@AgentClient {active, ntfClients} tSess@(userId, srv, _) = d getXFTPServerClient :: forall m. AgentMonad m => AgentClient -> XFTPTransportSession -> m XFTPClient getXFTPServerClient c@AgentClient {active, xftpClients, useNetworkConfig} tSess@(userId, srv, _) = do - unlessM (readTVarIO active) . throwError $ INTERNAL "agent is stopped" + unlessM (readTVarIO active) . throwError $ INACTIVE atomically (getClientVar tSess xftpClients) >>= either (newProtocolClient c tSess xftpClients connectClient $ \_ _ -> pure ()) @@ -860,6 +860,7 @@ temporaryAgentError :: AgentErrorType -> Bool temporaryAgentError = \case BROKER _ NETWORK -> True BROKER _ TIMEOUT -> True + INACTIVE -> True _ -> False temporaryOrHostError :: AgentErrorType -> Bool diff --git a/src/Simplex/Messaging/Agent/Protocol.hs b/src/Simplex/Messaging/Agent/Protocol.hs index 1f993b3b6..1a4f1f83f 100644 --- a/src/Simplex/Messaging/Agent/Protocol.hs +++ b/src/Simplex/Messaging/Agent/Protocol.hs @@ -1271,6 +1271,8 @@ data AgentErrorType AGENT {agentErr :: SMPAgentError} | -- | agent implementation or dependency errors INTERNAL {internalErr :: String} + | -- | agent inactive + INACTIVE deriving (Eq, Generic, Show, Exception) instance ToJSON AgentErrorType where @@ -1385,6 +1387,7 @@ instance StrEncoding AgentErrorType where <|> "AGENT QUEUE " *> (AGENT . A_QUEUE <$> parseRead A.takeByteString) <|> "AGENT " *> (AGENT <$> parseRead1) <|> "INTERNAL " *> (INTERNAL <$> parseRead A.takeByteString) + <|> "INACTIVE" *> pure INACTIVE where textP = T.unpack . safeDecodeUtf8 <$> A.takeTill (== ' ') strEncode = \case @@ -1400,6 +1403,7 @@ instance StrEncoding AgentErrorType where AGENT (A_QUEUE e) -> "AGENT QUEUE " <> bshow e AGENT e -> "AGENT " <> bshow e INTERNAL e -> "INTERNAL " <> bshow e + INACTIVE -> "INACTIVE" where text = encodeUtf8 . T.pack diff --git a/tests/AgentTests/FunctionalAPITests.hs b/tests/AgentTests/FunctionalAPITests.hs index 3d30f6548..df0d9dfb5 100644 --- a/tests/AgentTests/FunctionalAPITests.hs +++ b/tests/AgentTests/FunctionalAPITests.hs @@ -161,6 +161,8 @@ functionalAPITests t = do it "should notify after HELLO timeout" $ withSmpServer t testAsyncHelloTimeout describe "Message delivery" $ do + it "should deliver message after client restart" $ + testDeliverClientRestart t it "should deliver messages to the user once, even if repeat delivery is made by the server (no ACK)" $ testDuplicateMessage t it "should report error via msg integrity on skipped messages" $ @@ -473,6 +475,35 @@ testAsyncHelloTimeout = do aliceId <- joinConnection bob 1 True cReq "bob's connInfo" get bob ##> ("", aliceId, ERR $ CONN NOT_ACCEPTED) +testDeliverClientRestart :: HasCallStack => ATransport -> IO () +testDeliverClientRestart t = do + alice <- getSMPAgentClient' agentCfg initAgentServers testDB + bob <- getSMPAgentClient' agentCfg initAgentServers testDB2 + + (aliceId, bobId) <- withSmpServerStoreMsgLogOn t testPort $ \_ -> do + runRight $ do + (aliceId, bobId) <- makeConnection alice bob + exchangeGreetingsMsgId 4 alice bobId bob aliceId + pure (aliceId, bobId) + + ("", "", DOWN _ _) <- nGet alice + ("", "", DOWN _ _) <- nGet bob + + 6 <- runRight $ sendMessage bob aliceId SMP.noMsgFlags "hello" + + disconnectAgentClient bob + + bob2 <- getSMPAgentClient' agentCfg initAgentServers testDB2 + + withSmpServerStoreMsgLogOn t testPort $ \_ -> do + runRight_ $ do + ("", "", UP _ _) <- nGet alice + + subscribeConnection bob2 aliceId + + get bob2 ##> ("", aliceId, SENT 6) + get alice =##> \case ("", c, Msg "hello") -> c == bobId; _ -> False + testDuplicateMessage :: HasCallStack => ATransport -> IO () testDuplicateMessage t = do alice <- getSMPAgentClient' agentCfg initAgentServers testDB