mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-04-27 06:25:15 +00:00
agent: include server address string into BROKER errors (#575)
* agent: include server address string into BROKER errors * eol Co-authored-by: JRoberts <8711996+jr-simplex@users.noreply.github.com> Co-authored-by: JRoberts <8711996+jr-simplex@users.noreply.github.com>
This commit is contained in:
committed by
GitHub
parent
dc920d90d9
commit
e4842f4f47
@@ -878,7 +878,7 @@ runCommandProcessing c@AgentClient {subQ} server_ = do
|
||||
tryCommand action = withRetryInterval ri $ \loop ->
|
||||
tryError action >>= \case
|
||||
Left e
|
||||
| temporaryAgentError e || e == BROKER HOST -> retrySndOp c loop
|
||||
| temporaryOrHostError e -> retrySndOp c loop
|
||||
| otherwise -> cmdError e
|
||||
Right () -> withStore' c (`deleteCommand` cmdId)
|
||||
tryWithLock name = tryCommand . withConnLock c connId name
|
||||
@@ -1016,7 +1016,7 @@ runSmpQueueMsgDelivery c@AgentClient {subQ} cData@ConnData {connId, duplexHandsh
|
||||
_
|
||||
-- for other operations BROKER HOST is treated as a permanent error (e.g., when connecting to the server),
|
||||
-- the message sending would be retried
|
||||
| temporaryAgentError e || e == BROKER HOST -> do
|
||||
| temporaryOrHostError e -> do
|
||||
let timeoutSel = if msgType == AM_HELLO_ then helloTimeout else messageTimeout
|
||||
ifM (msgExpired timeoutSel) (notifyDel msgId err) (retrySndOp c loop)
|
||||
| otherwise -> notifyDel msgId err
|
||||
@@ -1242,7 +1242,7 @@ registerNtfToken' c suppliedDeviceToken suppliedNtfMode =
|
||||
replaceToken tknId = do
|
||||
ns <- asks ntfSupervisor
|
||||
tryReplace ns `catchError` \e ->
|
||||
if temporaryAgentError e || e == BROKER HOST
|
||||
if temporaryOrHostError e
|
||||
then throwError e
|
||||
else do
|
||||
withStore' c $ \db -> removeNtfToken db tkn
|
||||
@@ -1577,7 +1577,7 @@ processSMPTransmission c@AgentClient {smpClients, subQ} (srv, v, sessId, rId, cm
|
||||
ignored = pure "END from disconnected client - ignored"
|
||||
_ -> do
|
||||
logServer "<--" c srv rId $ "unexpected: " <> bshow cmd
|
||||
notify . ERR $ BROKER UNEXPECTED
|
||||
notify . ERR $ BROKER (B.unpack $ strEncode srv) UNEXPECTED
|
||||
where
|
||||
notify :: ACommand 'Agent -> m ()
|
||||
notify msg = atomically $ writeTBQueue subQ ("", connId, msg)
|
||||
|
||||
@@ -34,6 +34,7 @@ module Simplex.Messaging.Agent.Client
|
||||
sendConfirmation,
|
||||
sendInvitation,
|
||||
temporaryAgentError,
|
||||
temporaryOrHostError,
|
||||
secureQueue,
|
||||
enableQueueNotifications,
|
||||
disableQueueNotifications,
|
||||
@@ -108,6 +109,7 @@ import Data.Text.Encoding
|
||||
import Data.Word (Word16)
|
||||
import qualified Database.SQLite.Simple as DB
|
||||
import GHC.Generics (Generic)
|
||||
import Network.Socket (HostName)
|
||||
import Simplex.Messaging.Agent.Env.SQLite
|
||||
import Simplex.Messaging.Agent.Lock
|
||||
import Simplex.Messaging.Agent.Protocol
|
||||
@@ -279,13 +281,13 @@ getSMPServerClient c@AgentClient {active, smpClients, msgQ} srv = do
|
||||
atomically (getClientVar srv smpClients)
|
||||
>>= either
|
||||
(newProtocolClient c srv smpClients connectClient reconnectClient)
|
||||
(waitForProtocolClient c)
|
||||
(waitForProtocolClient c srv)
|
||||
where
|
||||
connectClient :: m SMPClient
|
||||
connectClient = do
|
||||
cfg <- getClientConfig c smpCfg
|
||||
u <- askUnliftIO
|
||||
liftEitherError (protocolClientError SMP) (getProtocolClient srv cfg (Just msgQ) $ clientDisconnected u)
|
||||
liftEitherError (protocolClientError SMP $ B.unpack $ strEncode srv) (getProtocolClient srv cfg (Just msgQ) $ clientDisconnected u)
|
||||
|
||||
clientDisconnected :: UnliftIO m -> SMPClient -> IO ()
|
||||
clientDisconnected u client = do
|
||||
@@ -350,12 +352,12 @@ getNtfServerClient c@AgentClient {active, ntfClients} srv = do
|
||||
atomically (getClientVar srv ntfClients)
|
||||
>>= either
|
||||
(newProtocolClient c srv ntfClients connectClient $ pure ())
|
||||
(waitForProtocolClient c)
|
||||
(waitForProtocolClient c srv)
|
||||
where
|
||||
connectClient :: m NtfClient
|
||||
connectClient = do
|
||||
cfg <- getClientConfig c ntfCfg
|
||||
liftEitherError (protocolClientError NTF) (getProtocolClient srv cfg Nothing clientDisconnected)
|
||||
liftEitherError (protocolClientError NTF $ B.unpack $ strEncode srv) (getProtocolClient srv cfg Nothing clientDisconnected)
|
||||
|
||||
clientDisconnected :: NtfClient -> IO ()
|
||||
clientDisconnected client = do
|
||||
@@ -372,14 +374,14 @@ getClientVar srv clients = maybe (Left <$> newClientVar) (pure . Right) =<< TM.l
|
||||
TM.insert srv var clients
|
||||
pure var
|
||||
|
||||
waitForProtocolClient :: AgentMonad m => AgentClient -> ClientVar msg -> m (ProtocolClient msg)
|
||||
waitForProtocolClient c clientVar = do
|
||||
waitForProtocolClient :: (AgentMonad m, ProtocolTypeI (ProtoType msg)) => AgentClient -> ProtoServer msg -> ClientVar msg -> m (ProtocolClient msg)
|
||||
waitForProtocolClient c srv clientVar = do
|
||||
NetworkConfig {tcpConnectTimeout} <- readTVarIO $ useNetworkConfig c
|
||||
client_ <- liftIO $ tcpConnectTimeout `timeout` atomically (readTMVar clientVar)
|
||||
liftEither $ case client_ of
|
||||
Just (Right smpClient) -> Right smpClient
|
||||
Just (Left e) -> Left e
|
||||
Nothing -> Left $ BROKER TIMEOUT
|
||||
Nothing -> Left $ BROKER (B.unpack $ strEncode srv) TIMEOUT
|
||||
|
||||
newProtocolClient ::
|
||||
forall msg m.
|
||||
@@ -499,24 +501,24 @@ withLogClient_ c srv qId cmdStr action = do
|
||||
logServer "<--" c srv qId "OK"
|
||||
return res
|
||||
|
||||
withClient :: forall m msg a. (AgentMonad m, ProtocolServerClient msg) => AgentClient -> ProtoServer msg -> (ProtocolClient msg -> ExceptT ProtocolClientError IO a) -> m a
|
||||
withClient c srv action = withClient_ c srv $ liftClient (clientProtocolError @msg) . action
|
||||
withClient :: forall m msg a. (AgentMonad m, ProtocolServerClient msg, ProtocolTypeI (ProtoType msg)) => AgentClient -> ProtoServer msg -> (ProtocolClient msg -> ExceptT ProtocolClientError IO a) -> m a
|
||||
withClient c srv action = withClient_ c srv $ \client -> liftClient (clientProtocolError @msg) (clientServer client) $ action client
|
||||
|
||||
withLogClient :: forall m msg a. (AgentMonad m, ProtocolServerClient msg) => AgentClient -> ProtoServer msg -> QueueId -> ByteString -> (ProtocolClient msg -> ExceptT ProtocolClientError IO a) -> m a
|
||||
withLogClient c srv qId cmdStr action = withLogClient_ c srv qId cmdStr $ liftClient (clientProtocolError @msg) . action
|
||||
withLogClient :: forall m msg a. (AgentMonad m, ProtocolServerClient msg, ProtocolTypeI (ProtoType msg)) => AgentClient -> ProtoServer msg -> QueueId -> ByteString -> (ProtocolClient msg -> ExceptT ProtocolClientError IO a) -> m a
|
||||
withLogClient c srv qId cmdStr action = withLogClient_ c srv qId cmdStr $ \client -> liftClient (clientProtocolError @msg) (clientServer client) $ action client
|
||||
|
||||
liftClient :: AgentMonad m => (ErrorType -> AgentErrorType) -> ExceptT ProtocolClientError IO a -> m a
|
||||
liftClient = liftError . protocolClientError
|
||||
liftClient :: AgentMonad m => (ErrorType -> AgentErrorType) -> HostName -> ExceptT ProtocolClientError IO a -> m a
|
||||
liftClient protocolError_ = liftError . protocolClientError protocolError_
|
||||
|
||||
protocolClientError :: (ErrorType -> AgentErrorType) -> ProtocolClientError -> AgentErrorType
|
||||
protocolClientError protocolError_ = \case
|
||||
protocolClientError :: (ErrorType -> AgentErrorType) -> HostName -> ProtocolClientError -> AgentErrorType
|
||||
protocolClientError protocolError_ host = \case
|
||||
PCEProtocolError e -> protocolError_ e
|
||||
PCEResponseError e -> BROKER $ RESPONSE e
|
||||
PCEUnexpectedResponse _ -> BROKER UNEXPECTED
|
||||
PCEResponseTimeout -> BROKER TIMEOUT
|
||||
PCENetworkError -> BROKER NETWORK
|
||||
PCEIncompatibleHost -> BROKER HOST
|
||||
PCETransportError e -> BROKER $ TRANSPORT e
|
||||
PCEResponseError e -> BROKER host $ RESPONSE e
|
||||
PCEUnexpectedResponse _ -> BROKER host UNEXPECTED
|
||||
PCEResponseTimeout -> BROKER host TIMEOUT
|
||||
PCENetworkError -> BROKER host NETWORK
|
||||
PCEIncompatibleHost -> BROKER host HOST
|
||||
PCETransportError e -> BROKER host $ TRANSPORT e
|
||||
e@PCESignatureError {} -> INTERNAL $ show e
|
||||
e@PCEIOError {} -> INTERNAL $ show e
|
||||
|
||||
@@ -552,11 +554,12 @@ runSMPServerTest c (ProtoServerWithAuth srv auth) = do
|
||||
liftError (testErr TSSecureQueue) $ secureSMPQueue smp rpKey rcvId sKey
|
||||
liftError (testErr TSDeleteQueue) $ deleteSMPQueue smp rpKey rcvId
|
||||
ok <- tcpTimeout (networkConfig cfg) `timeout` closeProtocolClient smp
|
||||
pure $ either Just (const Nothing) r <|> maybe (Just (SMPTestFailure TSDisconnect $ BROKER TIMEOUT)) (const Nothing) ok
|
||||
Left e -> pure . Just $ testErr TSConnect e
|
||||
pure $ either Just (const Nothing) r <|> maybe (Just (SMPTestFailure TSDisconnect $ BROKER addr TIMEOUT)) (const Nothing) ok
|
||||
Left e -> pure (Just $ testErr TSConnect e)
|
||||
where
|
||||
addr = B.unpack $ strEncode srv
|
||||
testErr :: SMPTestStep -> ProtocolClientError -> SMPTestFailure
|
||||
testErr step err = SMPTestFailure step $ protocolClientError SMP err
|
||||
testErr step = SMPTestFailure step . protocolClientError SMP addr
|
||||
|
||||
newRcvQueue :: AgentMonad m => AgentClient -> ConnId -> SMPServerWithAuth -> VersionRange -> m (RcvQueue, SMPQueueUri)
|
||||
newRcvQueue c connId (ProtoServerWithAuth srv auth) vRange = do
|
||||
@@ -614,10 +617,15 @@ temporaryClientError = \case
|
||||
|
||||
temporaryAgentError :: AgentErrorType -> Bool
|
||||
temporaryAgentError = \case
|
||||
BROKER NETWORK -> True
|
||||
BROKER TIMEOUT -> True
|
||||
BROKER _ NETWORK -> True
|
||||
BROKER _ TIMEOUT -> True
|
||||
_ -> False
|
||||
|
||||
temporaryOrHostError :: AgentErrorType -> Bool
|
||||
temporaryOrHostError = \case
|
||||
BROKER _ HOST -> True
|
||||
e -> temporaryAgentError e
|
||||
|
||||
-- | subscribe multiple queues - all passed queues should be on the same server
|
||||
subscribeQueues :: AgentMonad m => AgentClient -> SMPServer -> [RcvQueue] -> m (Maybe SMPClient, [(RcvQueue, Either AgentErrorType ())])
|
||||
subscribeQueues c srv qs = do
|
||||
@@ -637,7 +645,7 @@ subscribeQueues c srv qs = do
|
||||
liftIO $ do
|
||||
rs <- zip qs_ . L.toList <$> subscribeSMPQueues smp qs2
|
||||
mapM_ (uncurry $ processSubResult c) rs
|
||||
pure $ map (second . first $ protocolClientError SMP) rs
|
||||
pure $ map (second . first $ protocolClientError SMP $ clientServer smp) rs
|
||||
_ -> pure (Nothing, errs)
|
||||
where
|
||||
checkQueue rq@RcvQueue {rcvId, server} = do
|
||||
@@ -679,14 +687,14 @@ sendConfirmation c sq@SndQueue {server, sndId, sndPublicKey = Just sndPublicKey,
|
||||
withLogClient_ c server sndId "SEND <CONF>" $ \smp -> do
|
||||
let clientMsg = SMP.ClientMessage (SMP.PHConfirmation sndPublicKey) agentConfirmation
|
||||
msg <- agentCbEncrypt sq e2ePubKey $ smpEncode clientMsg
|
||||
liftClient SMP $ sendSMPMessage smp Nothing sndId (SMP.MsgFlags {notification = True}) msg
|
||||
liftClient SMP (clientServer smp) $ sendSMPMessage smp Nothing sndId (SMP.MsgFlags {notification = True}) msg
|
||||
sendConfirmation _ _ _ = throwError $ INTERNAL "sendConfirmation called without snd_queue public key(s) in the database"
|
||||
|
||||
sendInvitation :: forall m. AgentMonad m => AgentClient -> Compatible SMPQueueInfo -> Compatible Version -> ConnectionRequestUri 'CMInvitation -> ConnInfo -> m ()
|
||||
sendInvitation c (Compatible (SMPQueueInfo v SMPQueueAddress {smpServer, senderId, dhPublicKey})) (Compatible agentVersion) connReq connInfo =
|
||||
withLogClient_ c smpServer senderId "SEND <INV>" $ \smp -> do
|
||||
msg <- mkInvitation
|
||||
liftClient SMP $ sendSMPMessage smp Nothing senderId MsgFlags {notification = True} msg
|
||||
liftClient SMP (clientServer smp) $ sendSMPMessage smp Nothing senderId MsgFlags {notification = True} msg
|
||||
where
|
||||
mkInvitation :: m ByteString
|
||||
-- this is only encrypted with per-queue E2E, not with double ratchet
|
||||
@@ -757,7 +765,7 @@ sendAgentMessage c sq@SndQueue {server, sndId, sndPrivateKey} msgFlags agentMsg
|
||||
withLogClient_ c server sndId "SEND <MSG>" $ \smp -> do
|
||||
let clientMsg = SMP.ClientMessage SMP.PHEmpty agentMsg
|
||||
msg <- agentCbEncrypt sq Nothing $ smpEncode clientMsg
|
||||
liftClient SMP $ sendSMPMessage smp (Just sndPrivateKey) sndId msgFlags msg
|
||||
liftClient SMP (clientServer smp) $ sendSMPMessage smp (Just sndPrivateKey) sndId msgFlags msg
|
||||
|
||||
agentNtfRegisterToken :: AgentMonad m => AgentClient -> NtfToken -> C.APublicVerifyKey -> C.PublicKeyX25519 -> m (NtfTokenId, C.PublicKeyX25519)
|
||||
agentNtfRegisterToken c NtfToken {deviceToken, ntfServer, ntfPrivKey} ntfPubKey pubDhKey =
|
||||
|
||||
@@ -310,8 +310,8 @@ retryOnError :: AgentMonad m => AgentClient -> Text -> m () -> (AgentErrorType -
|
||||
retryOnError c name loop done e = do
|
||||
logError $ name <> " error: " <> tshow e
|
||||
case e of
|
||||
BROKER NETWORK -> retryLoop
|
||||
BROKER TIMEOUT -> retryLoop
|
||||
BROKER _ NETWORK -> retryLoop
|
||||
BROKER _ TIMEOUT -> retryLoop
|
||||
_ -> done e
|
||||
where
|
||||
retryLoop = do
|
||||
|
||||
@@ -142,6 +142,7 @@ import Data.Map (Map)
|
||||
import qualified Data.Map as M
|
||||
import Data.Maybe (isJust)
|
||||
import Data.Text (Text)
|
||||
import qualified Data.Text as T
|
||||
import Data.Text.Encoding (encodeUtf8)
|
||||
import Data.Time.Clock (UTCTime)
|
||||
import Data.Time.Clock.System (SystemTime)
|
||||
@@ -1050,7 +1051,7 @@ data AgentErrorType
|
||||
| -- | NTF protocol errors forwarded to agent clients
|
||||
NTF {ntfErr :: ErrorType}
|
||||
| -- | SMP server errors
|
||||
BROKER {brokerErr :: BrokerErrorType}
|
||||
BROKER {brokerAddress :: String, brokerErr :: BrokerErrorType}
|
||||
| -- | errors of other agents
|
||||
AGENT {agentErr :: SMPAgentError}
|
||||
| -- | agent implementation or dependency errors
|
||||
@@ -1144,23 +1145,27 @@ instance StrEncoding AgentErrorType where
|
||||
<|> "CONN " *> (CONN <$> parseRead1)
|
||||
<|> "SMP " *> (SMP <$> strP)
|
||||
<|> "NTF " *> (NTF <$> strP)
|
||||
<|> "BROKER RESPONSE " *> (BROKER . RESPONSE <$> strP)
|
||||
<|> "BROKER TRANSPORT " *> (BROKER . TRANSPORT <$> transportErrorP)
|
||||
<|> "BROKER " *> (BROKER <$> parseRead1)
|
||||
<|> "BROKER " *> (BROKER <$> srvP <* " RESPONSE " <*> (RESPONSE <$> strP))
|
||||
<|> "BROKER " *> (BROKER <$> srvP <* " TRANSPORT " <*> (TRANSPORT <$> transportErrorP))
|
||||
<|> "BROKER " *> (BROKER <$> srvP <* A.space <*> parseRead1)
|
||||
<|> "AGENT QUEUE " *> (AGENT . A_QUEUE <$> parseRead A.takeByteString)
|
||||
<|> "AGENT " *> (AGENT <$> parseRead1)
|
||||
<|> "INTERNAL " *> (INTERNAL <$> parseRead A.takeByteString)
|
||||
where
|
||||
srvP = T.unpack . safeDecodeUtf8 <$> A.takeTill (== ' ')
|
||||
strEncode = \case
|
||||
CMD e -> "CMD " <> bshow e
|
||||
CONN e -> "CONN " <> bshow e
|
||||
SMP e -> "SMP " <> strEncode e
|
||||
NTF e -> "NTF " <> strEncode e
|
||||
BROKER (RESPONSE e) -> "BROKER RESPONSE " <> strEncode e
|
||||
BROKER (TRANSPORT e) -> "BROKER TRANSPORT " <> serializeTransportError e
|
||||
BROKER e -> "BROKER " <> bshow e
|
||||
BROKER srv (RESPONSE e) -> "BROKER " <> addr srv <> " RESPONSE " <> strEncode e
|
||||
BROKER srv (TRANSPORT e) -> "BROKER " <> addr srv <> " TRANSPORT " <> serializeTransportError e
|
||||
BROKER srv e -> "BROKER " <> addr srv <> " " <> bshow e
|
||||
AGENT (A_QUEUE e) -> "AGENT QUEUE " <> bshow e
|
||||
AGENT e -> "AGENT " <> bshow e
|
||||
INTERNAL e -> "INTERNAL " <> bshow e
|
||||
where
|
||||
addr = encodeUtf8 . T.pack
|
||||
|
||||
instance Arbitrary AgentErrorType where arbitrary = genericArbitraryU
|
||||
|
||||
|
||||
@@ -30,6 +30,7 @@ module Simplex.Messaging.Client
|
||||
SMPClient,
|
||||
getProtocolClient,
|
||||
closeProtocolClient,
|
||||
clientServer,
|
||||
|
||||
-- * SMP protocol command functions
|
||||
createSMPQueue,
|
||||
@@ -80,6 +81,7 @@ import GHC.Generics (Generic)
|
||||
import Network.Socket (ServiceName)
|
||||
import Numeric.Natural
|
||||
import qualified Simplex.Messaging.Crypto as C
|
||||
import Simplex.Messaging.Encoding.String
|
||||
import Simplex.Messaging.Parsers (dropPrefix, enumJSON)
|
||||
import Simplex.Messaging.Protocol as SMP
|
||||
import Simplex.Messaging.TMap (TMap)
|
||||
@@ -212,6 +214,9 @@ chooseTransportHost NetworkConfig {socksProxy, hostMode, requiredHostMode} hosts
|
||||
onionHost = find isOnionHost hosts
|
||||
publicHost = find (not . isOnionHost) hosts
|
||||
|
||||
clientServer :: ProtocolTypeI (ProtoType msg) => ProtocolClient msg -> String
|
||||
clientServer = B.unpack . strEncode . protocolServer
|
||||
|
||||
-- | Connects to 'ProtocolServer' using passed client configuration
|
||||
-- and queue for messages and notifications.
|
||||
--
|
||||
|
||||
Reference in New Issue
Block a user