mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-03-30 14:16:00 +00:00
agent: treat absent proxy session as a temporary error to retry sending (#1178)
* agent: treat absent proxy session as a temporary error to retry sending * enable all tests
This commit is contained in:
committed by
GitHub
parent
ee052a454e
commit
63f5e76f9c
@@ -1039,17 +1039,34 @@ sendOrProxySMPMessage c userId destSrv cmdStr spKey_ senderId msgFlags msg = do
|
||||
SPFAllowProtected -> ipAddressProtected cfg destSrv
|
||||
SPFProhibit -> False
|
||||
unknownServer = maybe True (all ((destSrv /=) . protoServer)) <$> TM.lookup userId (userServers c)
|
||||
sendViaProxy destSess = do
|
||||
sendViaProxy destSess@(_, _, qId) = do
|
||||
r <- tryAgentError . withProxySession c destSess senderId ("PFWD " <> cmdStr) $ \(SMPConnectedClient smp _, proxySess) -> do
|
||||
liftClient SMP (clientServer smp) (proxySMPMessage smp proxySess spKey_ senderId msgFlags msg) >>= \case
|
||||
Right () -> pure . Just $ protocolClientServer' smp
|
||||
Left proxyErr ->
|
||||
Left proxyErr -> do
|
||||
case proxyErr of
|
||||
(ProxyProtocolError (SMP.PROXY SMP.NO_SESSION)) -> atomically deleteRelaySession
|
||||
_ -> pure ()
|
||||
throwE
|
||||
PROXY
|
||||
{ proxyServer = protocolClientServer smp,
|
||||
relayServer = B.unpack $ strEncode destSrv,
|
||||
proxyErr
|
||||
}
|
||||
where
|
||||
-- checks that the current proxied relay session is the same one that was used to send the message and removes it
|
||||
deleteRelaySession =
|
||||
( TM.lookup destSess (smpProxiedRelays c)
|
||||
$>>= \(ProtoServerWithAuth srv _) -> tryReadSessVar (userId, srv, qId) (smpClients c)
|
||||
)
|
||||
>>= \case
|
||||
Just (Right (SMPConnectedClient smp' prs)) | sameClient smp' ->
|
||||
tryReadSessVar destSrv prs >>= \case
|
||||
Just (Right proxySess') | sameProxiedRelay proxySess' -> TM.delete destSrv prs
|
||||
_ -> pure ()
|
||||
_ -> pure ()
|
||||
sameClient smp' = sessionId (thParams smp) == sessionId (thParams smp')
|
||||
sameProxiedRelay proxySess' = prSessionId proxySess == prSessionId proxySess'
|
||||
case r of
|
||||
Right r' -> pure r'
|
||||
Left e
|
||||
@@ -1288,6 +1305,7 @@ temporaryAgentError = \case
|
||||
BROKER _ e -> tempBrokerError e
|
||||
SMP _ (SMP.PROXY (SMP.BROKER e)) -> tempBrokerError e
|
||||
PROXY _ _ (ProxyProtocolError (SMP.PROXY (SMP.BROKER e))) -> tempBrokerError e
|
||||
PROXY _ _ (ProxyProtocolError (SMP.PROXY SMP.NO_SESSION)) -> True
|
||||
INACTIVE -> True
|
||||
_ -> False
|
||||
where
|
||||
|
||||
@@ -59,6 +59,9 @@ testStoreLogFile2 = "tests/tmp/smp-server-store.log.2"
|
||||
testStoreMsgsFile :: FilePath
|
||||
testStoreMsgsFile = "tests/tmp/smp-server-messages.log"
|
||||
|
||||
testStoreMsgsFile2 :: FilePath
|
||||
testStoreMsgsFile2 = "tests/tmp/smp-server-messages.log.2"
|
||||
|
||||
testServerStatsBackupFile :: FilePath
|
||||
testServerStatsBackupFile = "tests/tmp/smp-server-stats.log"
|
||||
|
||||
|
||||
@@ -14,6 +14,7 @@ module SMPProxyTests where
|
||||
|
||||
import AgentTests.EqInstances ()
|
||||
import AgentTests.FunctionalAPITests
|
||||
import Control.Concurrent (ThreadId)
|
||||
import Control.Logger.Simple
|
||||
import Control.Monad (forM, forM_, forever)
|
||||
import Control.Monad.Trans.Except (ExceptT, runExceptT)
|
||||
@@ -113,6 +114,8 @@ smpProxyTests = do
|
||||
agentDeliverMessageViaProxy ([srv1], SPMUnknown, False) ([srv2], SPMUnknown, False) C.SEd448 "hello 1" "hello 2"
|
||||
it "fails when fallback is prohibited" . twoServers_ proxyCfg cfgV7 $
|
||||
agentViaProxyVersionError
|
||||
it "retries sending when destination or proxy relay is offline" $
|
||||
agentViaProxyRetryOffline
|
||||
describe "stress test 1k" $ do
|
||||
let deliver nAgents nMsgs = agentDeliverMessagesViaProxyConc (replicate nAgents [srv1]) (map bshow [1 :: Int .. nMsgs])
|
||||
it "2 agents, 250 messages" . oneServer $ deliver 2 250
|
||||
@@ -290,6 +293,68 @@ agentViaProxyVersionError =
|
||||
where
|
||||
servers srvs = (initAgentServersProxy SPMUnknown SPFProhibit) {smp = userServers $ L.map noAuthSrv srvs}
|
||||
|
||||
agentViaProxyRetryOffline :: IO ()
|
||||
agentViaProxyRetryOffline = do
|
||||
let srv1 = SMPServer testHost testPort testKeyHash
|
||||
srv2 = SMPServer testHost testPort2 testKeyHash
|
||||
msg1 = "hello 1"
|
||||
msg2 = "hello 2"
|
||||
aProxySrv = Just srv1
|
||||
bProxySrv = Just srv2
|
||||
withAgent 1 aCfg (servers srv1) testDB $ \alice ->
|
||||
withAgent 2 aCfg (servers srv2) testDB2 $ \bob -> do
|
||||
let pqEnc = CR.PQEncOn
|
||||
withServer $ \_ -> do
|
||||
(aliceId, bobId) <- withServer2 $ \_ -> runRight $ do
|
||||
(bobId, qInfo) <- A.createConnection alice 1 True SCMInvitation Nothing (CR.IKNoPQ PQSupportOn) SMSubscribe
|
||||
aliceId <- A.joinConnection bob 1 Nothing True qInfo "bob's connInfo" PQSupportOn SMSubscribe
|
||||
("", _, A.CONF confId pqSup' _ "bob's connInfo") <- get alice
|
||||
liftIO $ pqSup' `shouldBe` PQSupportOn
|
||||
allowConnection alice bobId confId "alice's connInfo"
|
||||
get alice ##> ("", bobId, A.CON pqEnc)
|
||||
get bob ##> ("", aliceId, A.INFO PQSupportOn "alice's connInfo")
|
||||
get bob ##> ("", aliceId, A.CON pqEnc)
|
||||
1 <- msgId <$> A.sendMessage alice bobId pqEnc noMsgFlags msg1
|
||||
get alice ##> ("", bobId, A.SENT (baseId + 1) aProxySrv)
|
||||
get bob =##> \case ("", c, Msg' _ pq msg1') -> c == aliceId && pq == pqEnc && msg1 == msg1'; _ -> False
|
||||
ackMessage bob aliceId (baseId + 1) Nothing
|
||||
2 <- msgId <$> A.sendMessage bob aliceId pqEnc noMsgFlags msg2
|
||||
get bob ##> ("", aliceId, A.SENT (baseId + 2) bProxySrv)
|
||||
get alice =##> \case ("", c, Msg' _ pq msg2') -> c == bobId && pq == pqEnc && msg2 == msg2'; _ -> False
|
||||
ackMessage alice bobId (baseId + 2) Nothing
|
||||
pure (aliceId, bobId)
|
||||
runRight_ $ do
|
||||
-- destination relay down
|
||||
3 <- msgId <$> A.sendMessage alice bobId pqEnc noMsgFlags msg1
|
||||
bob `down` aliceId
|
||||
withServer2 $ \_ -> runRight_ $ do
|
||||
bob `up` aliceId
|
||||
get alice ##> ("", bobId, A.SENT (baseId + 3) aProxySrv)
|
||||
get bob =##> \case ("", c, Msg' _ pq msg1') -> c == aliceId && pq == pqEnc && msg1 == msg1'; _ -> False
|
||||
ackMessage bob aliceId (baseId + 3) Nothing
|
||||
runRight_ $ do
|
||||
-- proxy relay down
|
||||
4 <- msgId <$> A.sendMessage bob aliceId pqEnc noMsgFlags msg2
|
||||
bob `down` aliceId
|
||||
withServer2 $ \_ -> runRight_ $ do
|
||||
bob `up` aliceId
|
||||
get bob ##> ("", aliceId, A.SENT (baseId + 4) bProxySrv)
|
||||
get alice =##> \case ("", c, Msg' _ pq msg2') -> c == bobId && pq == pqEnc && msg2 == msg2'; _ -> False
|
||||
ackMessage alice bobId (baseId + 4) Nothing
|
||||
where
|
||||
withServer :: (ThreadId -> IO a) -> IO a
|
||||
withServer = withServer_ testStoreLogFile testStoreMsgsFile testPort
|
||||
withServer2 :: (ThreadId -> IO a) -> IO a
|
||||
withServer2 = withServer_ testStoreLogFile2 testStoreMsgsFile2 testPort2
|
||||
withServer_ storeLog storeMsgs port =
|
||||
withSmpServerConfigOn (transport @TLS) proxyCfg {storeLogFile = Just storeLog, storeMsgsFile = Just storeMsgs} port
|
||||
a `up` cId = nGet a =##> \case ("", "", UP _ [c]) -> c == cId; _ -> False
|
||||
a `down` cId = nGet a =##> \case ("", "", DOWN _ [c]) -> c == cId; _ -> False
|
||||
aCfg = agentProxyCfg {messageRetryInterval = fastMessageRetryInterval}
|
||||
baseId = 3
|
||||
msgId = subtract baseId . fst
|
||||
servers srv = (initAgentServersProxy SPMAlways SPFProhibit) {smp = userServers $ L.map noAuthSrv [srv]}
|
||||
|
||||
testNoProxy :: IO ()
|
||||
testNoProxy = do
|
||||
withSmpServerConfigOn (transport @TLS) cfg testPort2 $ \_ -> do
|
||||
|
||||
Reference in New Issue
Block a user