From 63f5e76f9cb33742d3c0c812af0e342dffafca0c Mon Sep 17 00:00:00 2001 From: Evgeny Poberezkin Date: Wed, 29 May 2024 08:06:01 +0100 Subject: [PATCH] agent: treat absent proxy session as a temporary error to retry sending (#1178) * agent: treat absent proxy session as a temporary error to retry sending * enable all tests --- src/Simplex/Messaging/Agent/Client.hs | 22 ++++++++- tests/SMPClient.hs | 3 ++ tests/SMPProxyTests.hs | 65 +++++++++++++++++++++++++++ 3 files changed, 88 insertions(+), 2 deletions(-) diff --git a/src/Simplex/Messaging/Agent/Client.hs b/src/Simplex/Messaging/Agent/Client.hs index 37f659c9f..a99d957e3 100644 --- a/src/Simplex/Messaging/Agent/Client.hs +++ b/src/Simplex/Messaging/Agent/Client.hs @@ -1039,17 +1039,34 @@ sendOrProxySMPMessage c userId destSrv cmdStr spKey_ senderId msgFlags msg = do SPFAllowProtected -> ipAddressProtected cfg destSrv SPFProhibit -> False unknownServer = maybe True (all ((destSrv /=) . protoServer)) <$> TM.lookup userId (userServers c) - sendViaProxy destSess = do + sendViaProxy destSess@(_, _, qId) = do r <- tryAgentError . withProxySession c destSess senderId ("PFWD " <> cmdStr) $ \(SMPConnectedClient smp _, proxySess) -> do liftClient SMP (clientServer smp) (proxySMPMessage smp proxySess spKey_ senderId msgFlags msg) >>= \case Right () -> pure . Just $ protocolClientServer' smp - Left proxyErr -> + Left proxyErr -> do + case proxyErr of + (ProxyProtocolError (SMP.PROXY SMP.NO_SESSION)) -> atomically deleteRelaySession + _ -> pure () throwE PROXY { proxyServer = protocolClientServer smp, relayServer = B.unpack $ strEncode destSrv, proxyErr } + where + -- checks that the current proxied relay session is the same one that was used to send the message and removes it + deleteRelaySession = + ( TM.lookup destSess (smpProxiedRelays c) + $>>= \(ProtoServerWithAuth srv _) -> tryReadSessVar (userId, srv, qId) (smpClients c) + ) + >>= \case + Just (Right (SMPConnectedClient smp' prs)) | sameClient smp' -> + tryReadSessVar destSrv prs >>= \case + Just (Right proxySess') | sameProxiedRelay proxySess' -> TM.delete destSrv prs + _ -> pure () + _ -> pure () + sameClient smp' = sessionId (thParams smp) == sessionId (thParams smp') + sameProxiedRelay proxySess' = prSessionId proxySess == prSessionId proxySess' case r of Right r' -> pure r' Left e @@ -1288,6 +1305,7 @@ temporaryAgentError = \case BROKER _ e -> tempBrokerError e SMP _ (SMP.PROXY (SMP.BROKER e)) -> tempBrokerError e PROXY _ _ (ProxyProtocolError (SMP.PROXY (SMP.BROKER e))) -> tempBrokerError e + PROXY _ _ (ProxyProtocolError (SMP.PROXY SMP.NO_SESSION)) -> True INACTIVE -> True _ -> False where diff --git a/tests/SMPClient.hs b/tests/SMPClient.hs index 1828ffae6..f8c0e22c1 100644 --- a/tests/SMPClient.hs +++ b/tests/SMPClient.hs @@ -59,6 +59,9 @@ testStoreLogFile2 = "tests/tmp/smp-server-store.log.2" testStoreMsgsFile :: FilePath testStoreMsgsFile = "tests/tmp/smp-server-messages.log" +testStoreMsgsFile2 :: FilePath +testStoreMsgsFile2 = "tests/tmp/smp-server-messages.log.2" + testServerStatsBackupFile :: FilePath testServerStatsBackupFile = "tests/tmp/smp-server-stats.log" diff --git a/tests/SMPProxyTests.hs b/tests/SMPProxyTests.hs index 0c5792d7a..036c8b203 100644 --- a/tests/SMPProxyTests.hs +++ b/tests/SMPProxyTests.hs @@ -14,6 +14,7 @@ module SMPProxyTests where import AgentTests.EqInstances () import AgentTests.FunctionalAPITests +import Control.Concurrent (ThreadId) import Control.Logger.Simple import Control.Monad (forM, forM_, forever) import Control.Monad.Trans.Except (ExceptT, runExceptT) @@ -113,6 +114,8 @@ smpProxyTests = do agentDeliverMessageViaProxy ([srv1], SPMUnknown, False) ([srv2], SPMUnknown, False) C.SEd448 "hello 1" "hello 2" it "fails when fallback is prohibited" . twoServers_ proxyCfg cfgV7 $ agentViaProxyVersionError + it "retries sending when destination or proxy relay is offline" $ + agentViaProxyRetryOffline describe "stress test 1k" $ do let deliver nAgents nMsgs = agentDeliverMessagesViaProxyConc (replicate nAgents [srv1]) (map bshow [1 :: Int .. nMsgs]) it "2 agents, 250 messages" . oneServer $ deliver 2 250 @@ -290,6 +293,68 @@ agentViaProxyVersionError = where servers srvs = (initAgentServersProxy SPMUnknown SPFProhibit) {smp = userServers $ L.map noAuthSrv srvs} +agentViaProxyRetryOffline :: IO () +agentViaProxyRetryOffline = do + let srv1 = SMPServer testHost testPort testKeyHash + srv2 = SMPServer testHost testPort2 testKeyHash + msg1 = "hello 1" + msg2 = "hello 2" + aProxySrv = Just srv1 + bProxySrv = Just srv2 + withAgent 1 aCfg (servers srv1) testDB $ \alice -> + withAgent 2 aCfg (servers srv2) testDB2 $ \bob -> do + let pqEnc = CR.PQEncOn + withServer $ \_ -> do + (aliceId, bobId) <- withServer2 $ \_ -> runRight $ do + (bobId, qInfo) <- A.createConnection alice 1 True SCMInvitation Nothing (CR.IKNoPQ PQSupportOn) SMSubscribe + aliceId <- A.joinConnection bob 1 Nothing True qInfo "bob's connInfo" PQSupportOn SMSubscribe + ("", _, A.CONF confId pqSup' _ "bob's connInfo") <- get alice + liftIO $ pqSup' `shouldBe` PQSupportOn + allowConnection alice bobId confId "alice's connInfo" + get alice ##> ("", bobId, A.CON pqEnc) + get bob ##> ("", aliceId, A.INFO PQSupportOn "alice's connInfo") + get bob ##> ("", aliceId, A.CON pqEnc) + 1 <- msgId <$> A.sendMessage alice bobId pqEnc noMsgFlags msg1 + get alice ##> ("", bobId, A.SENT (baseId + 1) aProxySrv) + get bob =##> \case ("", c, Msg' _ pq msg1') -> c == aliceId && pq == pqEnc && msg1 == msg1'; _ -> False + ackMessage bob aliceId (baseId + 1) Nothing + 2 <- msgId <$> A.sendMessage bob aliceId pqEnc noMsgFlags msg2 + get bob ##> ("", aliceId, A.SENT (baseId + 2) bProxySrv) + get alice =##> \case ("", c, Msg' _ pq msg2') -> c == bobId && pq == pqEnc && msg2 == msg2'; _ -> False + ackMessage alice bobId (baseId + 2) Nothing + pure (aliceId, bobId) + runRight_ $ do + -- destination relay down + 3 <- msgId <$> A.sendMessage alice bobId pqEnc noMsgFlags msg1 + bob `down` aliceId + withServer2 $ \_ -> runRight_ $ do + bob `up` aliceId + get alice ##> ("", bobId, A.SENT (baseId + 3) aProxySrv) + get bob =##> \case ("", c, Msg' _ pq msg1') -> c == aliceId && pq == pqEnc && msg1 == msg1'; _ -> False + ackMessage bob aliceId (baseId + 3) Nothing + runRight_ $ do + -- proxy relay down + 4 <- msgId <$> A.sendMessage bob aliceId pqEnc noMsgFlags msg2 + bob `down` aliceId + withServer2 $ \_ -> runRight_ $ do + bob `up` aliceId + get bob ##> ("", aliceId, A.SENT (baseId + 4) bProxySrv) + get alice =##> \case ("", c, Msg' _ pq msg2') -> c == bobId && pq == pqEnc && msg2 == msg2'; _ -> False + ackMessage alice bobId (baseId + 4) Nothing + where + withServer :: (ThreadId -> IO a) -> IO a + withServer = withServer_ testStoreLogFile testStoreMsgsFile testPort + withServer2 :: (ThreadId -> IO a) -> IO a + withServer2 = withServer_ testStoreLogFile2 testStoreMsgsFile2 testPort2 + withServer_ storeLog storeMsgs port = + withSmpServerConfigOn (transport @TLS) proxyCfg {storeLogFile = Just storeLog, storeMsgsFile = Just storeMsgs} port + a `up` cId = nGet a =##> \case ("", "", UP _ [c]) -> c == cId; _ -> False + a `down` cId = nGet a =##> \case ("", "", DOWN _ [c]) -> c == cId; _ -> False + aCfg = agentProxyCfg {messageRetryInterval = fastMessageRetryInterval} + baseId = 3 + msgId = subtract baseId . fst + servers srv = (initAgentServersProxy SPMAlways SPFProhibit) {smp = userServers $ L.map noAuthSrv [srv]} + testNoProxy :: IO () testNoProxy = do withSmpServerConfigOn (transport @TLS) cfg testPort2 $ \_ -> do