mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-03-30 22:55:50 +00:00
differentiate agent INACTIVE error, treat as temporary (#784)
This commit is contained in:
@@ -399,7 +399,7 @@ instance ProtocolServerClient XFTPErrorType FileResponse where
|
||||
|
||||
getSMPServerClient :: forall m. AgentMonad m => AgentClient -> SMPTransportSession -> m SMPClient
|
||||
getSMPServerClient c@AgentClient {active, smpClients, msgQ} tSess@(userId, srv, _) = do
|
||||
unlessM (readTVarIO active) . throwError $ INTERNAL "agent is stopped"
|
||||
unlessM (readTVarIO active) . throwError $ INACTIVE
|
||||
atomically (getClientVar tSess smpClients)
|
||||
>>= either
|
||||
(newProtocolClient c tSess smpClients connectClient reconnectSMPClient)
|
||||
@@ -467,7 +467,7 @@ reconnectSMPClient c tSess@(_, srv, _) =
|
||||
|
||||
getNtfServerClient :: forall m. AgentMonad m => AgentClient -> NtfTransportSession -> m NtfClient
|
||||
getNtfServerClient c@AgentClient {active, ntfClients} tSess@(userId, srv, _) = do
|
||||
unlessM (readTVarIO active) . throwError $ INTERNAL "agent is stopped"
|
||||
unlessM (readTVarIO active) . throwError $ INACTIVE
|
||||
atomically (getClientVar tSess ntfClients)
|
||||
>>= either
|
||||
(newProtocolClient c tSess ntfClients connectClient $ \_ _ -> pure ())
|
||||
@@ -487,7 +487,7 @@ getNtfServerClient c@AgentClient {active, ntfClients} tSess@(userId, srv, _) = d
|
||||
|
||||
getXFTPServerClient :: forall m. AgentMonad m => AgentClient -> XFTPTransportSession -> m XFTPClient
|
||||
getXFTPServerClient c@AgentClient {active, xftpClients, useNetworkConfig} tSess@(userId, srv, _) = do
|
||||
unlessM (readTVarIO active) . throwError $ INTERNAL "agent is stopped"
|
||||
unlessM (readTVarIO active) . throwError $ INACTIVE
|
||||
atomically (getClientVar tSess xftpClients)
|
||||
>>= either
|
||||
(newProtocolClient c tSess xftpClients connectClient $ \_ _ -> pure ())
|
||||
@@ -860,6 +860,7 @@ temporaryAgentError :: AgentErrorType -> Bool
|
||||
temporaryAgentError = \case
|
||||
BROKER _ NETWORK -> True
|
||||
BROKER _ TIMEOUT -> True
|
||||
INACTIVE -> True
|
||||
_ -> False
|
||||
|
||||
temporaryOrHostError :: AgentErrorType -> Bool
|
||||
|
||||
@@ -1271,6 +1271,8 @@ data AgentErrorType
|
||||
AGENT {agentErr :: SMPAgentError}
|
||||
| -- | agent implementation or dependency errors
|
||||
INTERNAL {internalErr :: String}
|
||||
| -- | agent inactive
|
||||
INACTIVE
|
||||
deriving (Eq, Generic, Show, Exception)
|
||||
|
||||
instance ToJSON AgentErrorType where
|
||||
@@ -1385,6 +1387,7 @@ instance StrEncoding AgentErrorType where
|
||||
<|> "AGENT QUEUE " *> (AGENT . A_QUEUE <$> parseRead A.takeByteString)
|
||||
<|> "AGENT " *> (AGENT <$> parseRead1)
|
||||
<|> "INTERNAL " *> (INTERNAL <$> parseRead A.takeByteString)
|
||||
<|> "INACTIVE" *> pure INACTIVE
|
||||
where
|
||||
textP = T.unpack . safeDecodeUtf8 <$> A.takeTill (== ' ')
|
||||
strEncode = \case
|
||||
@@ -1400,6 +1403,7 @@ instance StrEncoding AgentErrorType where
|
||||
AGENT (A_QUEUE e) -> "AGENT QUEUE " <> bshow e
|
||||
AGENT e -> "AGENT " <> bshow e
|
||||
INTERNAL e -> "INTERNAL " <> bshow e
|
||||
INACTIVE -> "INACTIVE"
|
||||
where
|
||||
text = encodeUtf8 . T.pack
|
||||
|
||||
|
||||
@@ -161,6 +161,8 @@ functionalAPITests t = do
|
||||
it "should notify after HELLO timeout" $
|
||||
withSmpServer t testAsyncHelloTimeout
|
||||
describe "Message delivery" $ do
|
||||
it "should deliver message after client restart" $
|
||||
testDeliverClientRestart t
|
||||
it "should deliver messages to the user once, even if repeat delivery is made by the server (no ACK)" $
|
||||
testDuplicateMessage t
|
||||
it "should report error via msg integrity on skipped messages" $
|
||||
@@ -473,6 +475,35 @@ testAsyncHelloTimeout = do
|
||||
aliceId <- joinConnection bob 1 True cReq "bob's connInfo"
|
||||
get bob ##> ("", aliceId, ERR $ CONN NOT_ACCEPTED)
|
||||
|
||||
testDeliverClientRestart :: HasCallStack => ATransport -> IO ()
|
||||
testDeliverClientRestart t = do
|
||||
alice <- getSMPAgentClient' agentCfg initAgentServers testDB
|
||||
bob <- getSMPAgentClient' agentCfg initAgentServers testDB2
|
||||
|
||||
(aliceId, bobId) <- withSmpServerStoreMsgLogOn t testPort $ \_ -> do
|
||||
runRight $ do
|
||||
(aliceId, bobId) <- makeConnection alice bob
|
||||
exchangeGreetingsMsgId 4 alice bobId bob aliceId
|
||||
pure (aliceId, bobId)
|
||||
|
||||
("", "", DOWN _ _) <- nGet alice
|
||||
("", "", DOWN _ _) <- nGet bob
|
||||
|
||||
6 <- runRight $ sendMessage bob aliceId SMP.noMsgFlags "hello"
|
||||
|
||||
disconnectAgentClient bob
|
||||
|
||||
bob2 <- getSMPAgentClient' agentCfg initAgentServers testDB2
|
||||
|
||||
withSmpServerStoreMsgLogOn t testPort $ \_ -> do
|
||||
runRight_ $ do
|
||||
("", "", UP _ _) <- nGet alice
|
||||
|
||||
subscribeConnection bob2 aliceId
|
||||
|
||||
get bob2 ##> ("", aliceId, SENT 6)
|
||||
get alice =##> \case ("", c, Msg "hello") -> c == bobId; _ -> False
|
||||
|
||||
testDuplicateMessage :: HasCallStack => ATransport -> IO ()
|
||||
testDuplicateMessage t = do
|
||||
alice <- getSMPAgentClient' agentCfg initAgentServers testDB
|
||||
|
||||
Reference in New Issue
Block a user