servers: better socket leak prevention during TLS handshake, add NetworkError type to better diagnose connection errors (#1619)

* servers: better socket leak prevention during TLS handshake

* log tcp connection errors

* more detailed network error

* log full address

* rename error

* add encodings for NetworkError

* refactor

* comment

* bind

* style

* remove parameters of NETWORK error from encoding
This commit is contained in:
Evgeny
2025-09-02 16:07:37 +01:00
committed by GitHub
parent 0319addd2b
commit cb3250e7b4
12 changed files with 160 additions and 71 deletions

View File

@@ -59,6 +59,8 @@ import Simplex.Messaging.Protocol
RecipientId,
SenderId,
pattern NoEntity,
NetworkError (..),
toNetworkError,
)
import Simplex.Messaging.Transport (ALPN, CertChainPubKey (..), HandshakeError (..), THandleAuth (..), THandleParams (..), TransportError (..), TransportPeer (..), defaultSupportedParams)
import Simplex.Messaging.Transport.Client (TransportClientConfig (..), TransportHost)
@@ -191,7 +193,7 @@ xftpHTTP2Config transportConfig XFTPClientConfig {xftpNetworkConfig = NetworkCon
xftpClientError :: HTTP2ClientError -> XFTPClientError
xftpClientError = \case
HCResponseTimeout -> PCEResponseTimeout
HCNetworkError -> PCENetworkError
HCNetworkError e -> PCENetworkError e
HCIOError e -> PCEIOError e
sendXFTPCommand :: forall p. FilePartyI p => XFTPClient -> C.APrivateAuthKey -> XFTPFileId -> FileCommand p -> Maybe XFTPChunkSpec -> ExceptT XFTPClientError IO (FileResponse, HTTP2Body)
@@ -261,9 +263,9 @@ downloadXFTPChunk g c@XFTPClient {config} rpKey fId chunkSpec@XFTPRcvChunkSpec {
ExceptT (sequence <$> (t `timeout` (download cbState `catches` errors))) >>= maybe (throwE PCEResponseTimeout) pure
where
errors =
[ Handler $ \(_e :: H.HTTP2Error) -> pure $ Left PCENetworkError,
Handler $ \(e :: IOException) -> pure $ Left (PCEIOError e),
Handler $ \(_e :: SomeException) -> pure $ Left PCENetworkError
[ Handler $ \(e :: H.HTTP2Error) -> pure $ Left $ PCENetworkError $ NEConnectError $ displayException e,
Handler $ \(e :: IOException) -> pure $ Left $ PCEIOError e,
Handler $ \(e :: SomeException) -> pure $ Left $ PCENetworkError $ toNetworkError e
]
download cbState =
runExceptT . withExceptT PCEResponseError $

View File

@@ -250,6 +250,7 @@ import Simplex.Messaging.Protocol
EntityId (..),
ServiceId,
ErrorType,
NetworkError (..),
MsgFlags (..),
MsgId,
NtfServer,
@@ -1199,12 +1200,12 @@ protocolClientError protocolError_ host = \case
PCEResponseError e -> BROKER host $ RESPONSE $ B.unpack $ smpEncode e
PCEUnexpectedResponse e -> BROKER host $ UNEXPECTED $ B.unpack e
PCEResponseTimeout -> BROKER host TIMEOUT
PCENetworkError -> BROKER host NETWORK
PCENetworkError e -> BROKER host $ NETWORK e
PCEIncompatibleHost -> BROKER host HOST
PCETransportError e -> BROKER host $ TRANSPORT e
e@PCECryptoError {} -> INTERNAL $ show e
PCEServiceUnavailable {} -> BROKER host NO_SERVICE
PCEIOError {} -> BROKER host NETWORK
PCEIOError e -> BROKER host $ NETWORK $ NEConnectError $ E.displayException e
data ProtocolTestStep
= TSConnect
@@ -1478,7 +1479,7 @@ temporaryAgentError = \case
_ -> False
where
tempBrokerError = \case
NETWORK -> True
NETWORK _ -> True
TIMEOUT -> True
_ -> False
@@ -1518,7 +1519,7 @@ subscribeQueues c qs = do
subscribeQueues_ env session smp qs' = do
let (userId, srv, _) = transportSession' smp
atomically $ incSMPServerStat' c userId srv connSubAttempts $ length qs'
rs <- sendBatch (\smp' _ -> subscribeSMPQueues smp') smp NRMBackground qs'
rs <- sendBatch (\smp' _ -> subscribeSMPQueues smp') smp NRMBackground qs'
active <-
atomically $
ifM
@@ -1529,7 +1530,8 @@ subscribeQueues c qs = do
then when (hasTempErrors rs) resubscribe $> rs
else do
logWarn "subcription batch result for replaced SMP client, resubscribing"
resubscribe $> L.map (second $ \_ -> Left PCENetworkError) rs
-- TODO we probably use PCENetworkError here instead of the original error, so it becomes temporary.
resubscribe $> L.map (second $ Left . PCENetworkError . NESubscribeError . show) rs
where
tSess = transportSession' smp
sessId = sessionId $ thParams smp

View File

@@ -597,12 +597,14 @@ getProtocolClient g nm transportSession@(_, srv, _) cfg@ProtocolClientConfig {qS
socksCreds = clientSocksCredentials networkConfig proxySessTs transportSession
tId <-
runTransportClient tcConfig socksCreds useHost port' (Just $ keyHash srv) (client t c cVar)
`forkFinally` \_ -> void (atomically . tryPutTMVar cVar $ Left PCENetworkError)
`forkFinally` \r ->
let err = either toNetworkError (const NEFailedError) r
in void $ atomically $ tryPutTMVar cVar $ Left $ PCENetworkError err
c_ <- netTimeoutInt tcpConnectTimeout nm `timeout` atomically (takeTMVar cVar)
case c_ of
Just (Right c') -> mkWeakThreadId tId >>= \tId' -> pure $ Right c' {action = Just tId'}
Just (Left e) -> pure $ Left e
Nothing -> killThread tId $> Left PCENetworkError
Nothing -> killThread tId $> Left (PCENetworkError NETimeoutError)
useTransport :: (ServiceName, ATransport 'TClient)
useTransport = case port srv of
@@ -743,7 +745,7 @@ data ProtocolClientError err
PCEResponseTimeout
| -- | Failure to establish TCP connection.
-- Forwarded to the agent client as `ERR BROKER NETWORK`.
PCENetworkError
PCENetworkError NetworkError
| -- | No host compatible with network configuration
PCEIncompatibleHost
| -- | Service is unavailable for command that requires service connection
@@ -761,7 +763,7 @@ type SMPClientError = ProtocolClientError ErrorType
temporaryClientError :: ProtocolClientError err -> Bool
temporaryClientError = \case
PCENetworkError -> True
PCENetworkError _ -> True
PCEResponseTimeout -> True
PCEIOError _ -> True
_ -> False
@@ -782,7 +784,7 @@ smpProxyError = \case
PCEResponseError e -> PROXY $ BROKER $ RESPONSE $ B.unpack $ strEncode e
PCEUnexpectedResponse e -> PROXY $ BROKER $ UNEXPECTED $ B.unpack e
PCEResponseTimeout -> PROXY $ BROKER TIMEOUT
PCENetworkError -> PROXY $ BROKER NETWORK
PCENetworkError e -> PROXY $ BROKER $ NETWORK e
PCEIncompatibleHost -> PROXY $ BROKER HOST
PCEServiceUnavailable -> PROXY $ BROKER $ NO_SERVICE -- for completeness, it cannot happen.
PCETransportError t -> PROXY $ BROKER $ TRANSPORT t

View File

@@ -391,7 +391,7 @@ withSMP ca srv action = (getSMPServerClient' ca srv >>= action) `catchE` logSMPE
where
logSMPError :: SMPClientError -> ExceptT SMPClientError IO a
logSMPError e = do
logInfo $ "SMP error (" <> safeDecodeUtf8 (strEncode $ host srv) <> "): " <> tshow e
logInfo $ "SMP error (" <> safeDecodeUtf8 (strEncode srv) <> "): " <> tshow e
throwE e
subscribeQueuesNtfs :: SMPClientAgent 'NotifierService -> SMPServer -> NonEmpty (NotifierId, NtfPrivateAuthKey) -> IO ()

View File

@@ -613,7 +613,7 @@ ntfSubscriber NtfSubscriber {smpAgent = ca@SMPClientAgent {msgQ, agentQ}} =
PCEIncompatibleHost -> Just $ NSErr "IncompatibleHost"
PCEServiceUnavailable -> Just NSService -- this error should not happen on individual subscriptions
PCEResponseTimeout -> Nothing
PCENetworkError -> Nothing
PCENetworkError _ -> Nothing
PCEIOError _ -> Nothing
where
-- Note on moving to PostgreSQL: the idea of logging errors without e is removed here

View File

@@ -81,6 +81,7 @@ module Simplex.Messaging.Protocol
CommandError (..),
ProxyError (..),
BrokerErrorType (..),
NetworkError (..),
BlockingInfo (..),
BlockingReason (..),
RawTransmission,
@@ -168,6 +169,7 @@ module Simplex.Messaging.Protocol
noMsgFlags,
messageId,
messageTs,
toNetworkError,
-- * Parse and serialize
ProtocolMsgTag (..),
@@ -212,7 +214,7 @@ module Simplex.Messaging.Protocol
where
import Control.Applicative (optional, (<|>))
import Control.Exception (Exception)
import Control.Exception (Exception, SomeException, displayException, fromException)
import Control.Monad.Except
import Data.Aeson (FromJSON (..), ToJSON (..))
import qualified Data.Aeson.TH as J
@@ -241,6 +243,7 @@ import GHC.TypeLits (ErrorMessage (..), TypeError, type (+))
import qualified GHC.TypeLits as TE
import qualified GHC.TypeLits as Type
import Network.Socket (ServiceName)
import qualified Network.TLS as TLS
import Simplex.Messaging.Agent.Store.DB (Binary (..), FromField (..), ToField (..))
import qualified Simplex.Messaging.Crypto as C
import Simplex.Messaging.Encoding
@@ -1555,7 +1558,7 @@ data BrokerErrorType
| -- | unexpected response
UNEXPECTED {respErr :: String}
| -- | network error
NETWORK
NETWORK {networkError :: NetworkError}
| -- | no compatible server host (e.g. onion when public is required, or vice versa)
HOST
| -- | service unavailable client-side - used in agent errors
@@ -1566,6 +1569,24 @@ data BrokerErrorType
TIMEOUT
deriving (Eq, Read, Show, Exception)
data NetworkError
= NEConnectError {connectError :: String}
| NETLSError {tlsError :: String}
| NEUnknownCAError
| NEFailedError
| NETimeoutError
| NESubscribeError {subscribeError :: String}
deriving (Eq, Read, Show)
toNetworkError :: SomeException -> NetworkError
toNetworkError e = maybe (NEConnectError err) fromTLSError (fromException e)
where
err = displayException e
fromTLSError :: TLS.TLSException -> NetworkError
fromTLSError = \case
TLS.HandshakeFailed (TLS.Error_Protocol _ TLS.UnknownCa) -> NEUnknownCAError
_ -> NETLSError err
data BlockingInfo = BlockingInfo
{ reason :: BlockingReason
}
@@ -2001,7 +2022,7 @@ instance Encoding BrokerErrorType where
RESPONSE e -> "RESPONSE " <> smpEncode e
UNEXPECTED e -> "UNEXPECTED " <> smpEncode e
TRANSPORT e -> "TRANSPORT " <> smpEncode e
NETWORK -> "NETWORK"
NETWORK e -> "NETWORK" -- TODO once all upgrade: "NETWORK " <> smpEncode e
TIMEOUT -> "TIMEOUT"
HOST -> "HOST"
NO_SERVICE -> "NO_SERVICE"
@@ -2010,7 +2031,7 @@ instance Encoding BrokerErrorType where
"RESPONSE" -> RESPONSE <$> _smpP
"UNEXPECTED" -> UNEXPECTED <$> _smpP
"TRANSPORT" -> TRANSPORT <$> _smpP
"NETWORK" -> pure NETWORK
"NETWORK" -> NETWORK <$> (_smpP <|> pure NEFailedError)
"TIMEOUT" -> pure TIMEOUT
"HOST" -> pure HOST
"NO_SERVICE" -> pure NO_SERVICE
@@ -2021,7 +2042,7 @@ instance StrEncoding BrokerErrorType where
RESPONSE e -> "RESPONSE " <> encodeUtf8 (T.pack e)
UNEXPECTED e -> "UNEXPECTED " <> encodeUtf8 (T.pack e)
TRANSPORT e -> "TRANSPORT " <> smpEncode e
NETWORK -> "NETWORK"
NETWORK e -> "NETWORK" -- TODO once all upgrade: "NETWORK " <> strEncode e
TIMEOUT -> "TIMEOUT"
HOST -> "HOST"
NO_SERVICE -> "NO_SERVICE"
@@ -2030,13 +2051,50 @@ instance StrEncoding BrokerErrorType where
"RESPONSE" -> RESPONSE <$> _textP
"UNEXPECTED" -> UNEXPECTED <$> _textP
"TRANSPORT" -> TRANSPORT <$> _smpP
"NETWORK" -> pure NETWORK
"NETWORK" -> NETWORK <$> (_strP <|> pure NEFailedError)
"TIMEOUT" -> pure TIMEOUT
"HOST" -> pure HOST
"NO_SERVICE" -> pure NO_SERVICE
_ -> fail "bad BrokerErrorType"
where
_textP = A.space *> (T.unpack . safeDecodeUtf8 <$> A.takeByteString)
instance Encoding NetworkError where
smpEncode = \case
NEConnectError e -> "CONNECT " <> smpEncode e
NETLSError e -> "TLS " <> smpEncode e
NEUnknownCAError -> "UNKNOWNCA"
NEFailedError -> "FAILED"
NETimeoutError -> "TIMEOUT"
NESubscribeError e -> "SUBSCRIBE " <> smpEncode e
smpP =
A.takeTill (== ' ') >>= \case
"CONNECT" -> NEConnectError <$> _smpP
"TLS" -> NETLSError <$> _smpP
"UNKNOWNCA" -> pure NEUnknownCAError
"FAILED" -> pure NEFailedError
"TIMEOUT" -> pure NETimeoutError
"SUBSCRIBE" -> NESubscribeError <$> _smpP
_ -> fail "bad NetworkError"
instance StrEncoding NetworkError where
strEncode = \case
NEConnectError e -> "CONNECT " <> encodeUtf8 (T.pack e)
NETLSError e -> "TLS " <> encodeUtf8 (T.pack e)
NEUnknownCAError -> "UNKNOWNCA"
NEFailedError -> "FAILED"
NETimeoutError -> "TIMEOUT"
NESubscribeError e -> "SUBSCRIBE " <> encodeUtf8 (T.pack e)
strP =
A.takeTill (== ' ') >>= \case
"CONNECT" -> NEConnectError <$> _textP
"TLS" -> NETLSError <$> _textP
"UNKNOWNCA" -> pure NEUnknownCAError
"FAILED" -> pure NEFailedError
"TIMEOUT" -> pure NETimeoutError
"SUBSCRIBE" -> NESubscribeError <$> _textP
_ -> fail "bad NetworkError"
_textP :: Parser String
_textP = A.space *> (T.unpack . safeDecodeUtf8 <$> A.takeByteString)
-- | Send signed SMP transmission to TCP transport.
tPut :: Transport c => THandle v c p -> NonEmpty (Either TransportError SentRawTransmission) -> IO [Either TransportError ()]
@@ -2200,6 +2258,8 @@ $(J.deriveJSON defaultJSON ''MsgFlags)
$(J.deriveJSON (sumTypeJSON id) ''CommandError)
$(J.deriveJSON (sumTypeJSON $ dropPrefix "NE") ''NetworkError)
$(J.deriveJSON (sumTypeJSON id) ''BrokerErrorType)
$(J.deriveJSON defaultJSON ''BlockingInfo)

View File

@@ -14,7 +14,7 @@ import qualified Data.Attoparsec.ByteString.Char8 as A
import Data.Int (Int64)
import Data.Maybe (isJust)
import Data.Text (Text)
import Simplex.Messaging.Agent.Protocol (ConnectionLink, ConnectionMode (..), ConnectionRequestUri)
import Simplex.Messaging.Agent.Protocol (ConnectionLink, ConnectionMode (..))
import Simplex.Messaging.Encoding.String
import Simplex.Messaging.Parsers (defaultJSON, dropPrefix, enumJSON)

View File

@@ -30,12 +30,14 @@ where
import Control.Applicative (optional, (<|>))
import Control.Logger.Simple (logError)
import Control.Monad
import Data.Aeson (FromJSON (..), ToJSON (..))
import qualified Data.Attoparsec.ByteString.Char8 as A
import Data.ByteString.Char8 (ByteString)
import qualified Data.ByteString.Char8 as B
import Data.Char (isAsciiLower, isDigit, isHexDigit)
import Data.Default (def)
import Data.Functor (($>))
import Data.IORef
import Data.IP
import Data.List.NonEmpty (NonEmpty (..))
@@ -58,7 +60,7 @@ import Simplex.Messaging.Parsers (parseAll, parseString)
import Simplex.Messaging.Transport
import Simplex.Messaging.Transport.KeepAlive
import Simplex.Messaging.Transport.Shared
import Simplex.Messaging.Util (bshow, catchAll, tshow, (<$?>))
import Simplex.Messaging.Util (bshow, catchAll, catchAll_, tshow, (<$?>))
import System.IO.Error
import Text.Read (readMaybe)
import UnliftIO.Exception (IOException)
@@ -156,6 +158,11 @@ clientTransportConfig TransportClientConfig {logTLSErrors} =
runTransportClient :: Transport c => TransportClientConfig -> Maybe SocksCredentials -> TransportHost -> ServiceName -> Maybe C.KeyHash -> (c 'TClient -> IO a) -> IO a
runTransportClient = runTLSTransportClient defaultSupportedParams Nothing
data ConnectionHandle c
= CHSocket Socket
| CHContext T.Context
| CHTransport (c 'TClient)
runTLSTransportClient :: Transport c => T.Supported -> Maybe XS.CertificateStore -> TransportClientConfig -> Maybe SocksCredentials -> TransportHost -> ServiceName -> Maybe C.KeyHash -> (c 'TClient -> IO a) -> IO a
runTLSTransportClient tlsParams caStore_ cfg@TransportClientConfig {socksProxy, tcpKeepAlive, clientCredentials, clientALPN, useSNI} socksCreds host port keyHash client = do
serverCert <- newEmptyTMVarIO
@@ -165,17 +172,22 @@ runTLSTransportClient tlsParams caStore_ cfg@TransportClientConfig {socksProxy,
connectTCP = case socksProxy of
Just proxy -> connectSocksClient proxy socksCreds (hostAddr host)
_ -> connectTCPClient hostName
c <- do
sock <- connectTCP port
mapM_ (setSocketKeepAlive sock) tcpKeepAlive `catchAll` \e -> logError ("Error setting TCP keep-alive" <> tshow e)
h <- newIORef Nothing
let set hc = (>>= \c -> writeIORef h (Just $ hc c) $> c)
E.bracket (set CHSocket $ connectTCP port) (\_ -> closeConn h) $ \sock -> do
mapM_ (setSocketKeepAlive sock) tcpKeepAlive `catchAll` \e -> logError ("Error setting TCP keep-alive " <> tshow e)
let tCfg = clientTransportConfig cfg
-- No TLS timeout to avoid failing connections via SOCKS
tls <- connectTLS (Just hostName) tCfg clientParams sock
chain <- takePeerCertChain serverCert `E.onException` closeTLS tls
tls <- set CHContext $ connectTLS (Just hostName) tCfg clientParams sock
chain <- takePeerCertChain serverCert
sent <- readIORef clientCredsSent
getTransportConnection tCfg sent chain tls
client c `E.finally` closeConnection c
client =<< set CHTransport (getTransportConnection tCfg sent chain tls)
where
closeConn = readIORef >=> mapM_ (\c -> E.uninterruptibleMask_ $ closeConn_ c `catchAll_` pure ())
closeConn_ = \case
CHSocket sock -> close sock
CHContext tls -> closeTLS tls
CHTransport c -> closeConnection c
hostAddr = \case
THIPv4 addr -> SocksAddrIPV4 $ tupleToHostAddress addr
THIPv6 addr -> SocksAddrIPV6 addr
@@ -199,10 +211,11 @@ connectTCPClient host port = withSocketsDo $ resolve >>= tryOpen err
E.try (open addr) >>= either (`tryOpen` as) pure
open :: AddrInfo -> IO Socket
open addr = do
sock <- socket (addrFamily addr) (addrSocketType addr) (addrProtocol addr)
connect sock $ addrAddress addr
pure sock
open addr =
E.bracketOnError
(socket (addrFamily addr) (addrSocketType addr) (addrProtocol addr))
close
(\sock -> connect sock (addrAddress addr) $> sock)
defaultSMPPort :: PortNumber
defaultSMPPort = 5223

View File

@@ -27,6 +27,7 @@ import qualified Network.TLS as T
import Numeric.Natural (Natural)
import qualified Simplex.Messaging.Crypto as C
import Simplex.Messaging.Encoding.String
import Simplex.Messaging.Protocol (NetworkError (..), toNetworkError)
import Simplex.Messaging.Transport (ALPN, STransportPeer (..), SessionId, TLS (tlsALPN, tlsPeerCert, tlsUniq), TransportPeer (..), TransportPeerI (..), getServerVerifyKey)
import Simplex.Messaging.Transport.Client (TransportClientConfig (..), TransportHost (..), defaultTcpConnectTimeout, runTLSTransportClient)
import Simplex.Messaging.Transport.HTTP2
@@ -89,7 +90,7 @@ defaultHTTP2ClientConfig =
suportedTLSParams = http2TLSParams
}
data HTTP2ClientError = HCResponseTimeout | HCNetworkError | HCIOError IOException
data HTTP2ClientError = HCResponseTimeout | HCNetworkError NetworkError | HCIOError IOException
deriving (Show)
getHTTP2Client :: HostName -> ServiceName -> Maybe XS.CertificateStore -> HTTP2ClientConfig -> IO () -> IO (Either HTTP2ClientError HTTP2Client)
@@ -121,12 +122,15 @@ getVerifiedHTTP2ClientWith config host port disconnected setup =
runClient :: HClient -> IO (Either HTTP2ClientError HTTP2Client)
runClient c = do
cVar <- newEmptyTMVarIO
action <- async $ setup (client c cVar) `E.finally` atomically (putTMVar cVar $ Left HCNetworkError)
action <-
async $ setup (client c cVar) `E.catch` \e -> do
atomically $ putTMVar cVar $ Left $ HCNetworkError $ toNetworkError e
E.throwIO e
c_ <- connTimeout config `timeout` atomically (takeTMVar cVar)
case c_ of
Just (Right c') -> pure $ Right c' {action = Just action}
Just (Left e) -> pure $ Left e
Nothing -> cancel action $> Left HCNetworkError
Nothing -> cancel action $> Left (HCNetworkError NETimeoutError)
client :: HClient -> TMVar (Either HTTP2ClientError HTTP2Client) -> TLS p -> H.Client HTTP2Response
client c cVar tls sendReq = do
@@ -176,7 +180,7 @@ sendRequestDirect HTTP2Client {client_ = HClient {config, disconnected}, sendReq
reqTimeout `timeout` try (sendReq req process) >>= \case
Just (Right r) -> pure $ Right r
Just (Left e) -> disconnected $> Left (HCIOError e)
Nothing -> pure $ Left HCNetworkError
Nothing -> pure $ Left HCResponseTimeout
where
process r = do
respBody <- getHTTP2Body r $ bodyHeadSize config

View File

@@ -98,7 +98,7 @@ import qualified Simplex.Messaging.Crypto.Ratchet as CR
import Simplex.Messaging.Encoding
import Simplex.Messaging.Encoding.String
import Simplex.Messaging.Notifications.Transport (NTFVersion, pattern VersionNTF)
import Simplex.Messaging.Protocol (BasicAuth, ErrorType (..), MsgBody, ProtocolServer (..), SubscriptionMode (..), initialSMPClientVersion, srvHostnamesSMPClientVersion, supportedSMPClientVRange)
import Simplex.Messaging.Protocol (BasicAuth, ErrorType (..), MsgBody, NetworkError (..), ProtocolServer (..), SubscriptionMode (..), initialSMPClientVersion, srvHostnamesSMPClientVersion, supportedSMPClientVRange)
import qualified Simplex.Messaging.Protocol as SMP
import Simplex.Messaging.Server.Env.STM (AStoreType (..), ServerConfig (..), ServerStoreCfg (..), StorePaths (..))
import Simplex.Messaging.Server.Expiration
@@ -177,7 +177,7 @@ pGet' c skipWarn = do
case cmd of
CONNECT {} -> pGet c
DISCONNECT {} -> pGet c
ERR (BROKER _ NETWORK) -> pGet c
ERR (BROKER _ (NETWORK _)) -> pGet c
MWARN {} | skipWarn -> pGet c
RFWARN {} | skipWarn -> pGet c
SFWARN {} | skipWarn -> pGet c
@@ -516,7 +516,7 @@ functionalAPITests ps = do
it "should pass without basic auth" $ testSMPServerConnectionTest ps Nothing (noAuthSrv testSMPServer2) `shouldReturn` Nothing
let srv1 = testSMPServer2 {keyHash = "1234"}
it "should fail with incorrect fingerprint" $ do
testSMPServerConnectionTest ps Nothing (noAuthSrv srv1) `shouldReturn` Just (ProtocolTestFailure TSConnect $ BROKER (B.unpack $ strEncode srv1) NETWORK)
testSMPServerConnectionTest ps Nothing (noAuthSrv srv1) `shouldReturn` Just (ProtocolTestFailure TSConnect $ BROKER (B.unpack $ strEncode srv1) $ NETWORK NEUnknownCAError)
describe "server with password" $ do
let auth = Just "abcd"
srv = ProtoServerWithAuth testSMPServer2
@@ -1105,7 +1105,7 @@ testAsyncServerOffline ps = withAgentClients2 $ \alice bob -> do
(bobId, cReq) <- withSmpServerStoreLogOn ps testPort $ \_ ->
runRight $ createConnection alice 1 True SCMInvitation Nothing SMSubscribe
-- connection fails
Left (BROKER _ NETWORK) <- runExceptT $ joinConnection bob 1 True cReq "bob's connInfo" SMSubscribe
Left (BROKER _ (NETWORK _)) <- runExceptT $ joinConnection bob 1 True cReq "bob's connInfo" SMSubscribe
("", "", DOWN srv conns) <- nGet alice
srv `shouldBe` testSMPServer
conns `shouldBe` [bobId]
@@ -1172,13 +1172,13 @@ testInvitationErrors ps restart = do
("", "", DOWN _ [_]) <- nGet a
aId <- runRight $ A.prepareConnectionToJoin b 1 True cReq PQSupportOn
-- fails to secure the queue on testPort
BROKER srv NETWORK <- runLeft $ A.joinConnection b NRMInteractive 1 aId True cReq "bob's connInfo" PQSupportOn SMSubscribe
BROKER srv (NETWORK _) <- runLeft $ A.joinConnection b NRMInteractive 1 aId True cReq "bob's connInfo" PQSupportOn SMSubscribe
(testPort `isSuffixOf` srv) `shouldBe` True
withServer1 ps $ do
("", "", UP _ [_]) <- nGet a
let loopSecure = do
-- secures the queue on testPort, but fails to create reply queue on testPort2
BROKER srv2 NETWORK <- runLeft $ A.joinConnection b NRMInteractive 1 aId True cReq "bob's connInfo" PQSupportOn SMSubscribe
BROKER srv2 (NETWORK _) <- runLeft $ A.joinConnection b NRMInteractive 1 aId True cReq "bob's connInfo" PQSupportOn SMSubscribe
unless (testPort2 `isSuffixOf` srv2) $ putStrLn "retrying secure" >> threadDelay 200000 >> loopSecure
loopSecure
("", "", DOWN _ [_]) <- nGet a
@@ -1186,7 +1186,7 @@ testInvitationErrors ps restart = do
threadDelay 200000
let loopCreate = do
-- creates the reply queue on testPort2, but fails to send it to testPort
BROKER srv' NETWORK <- runLeft $ A.joinConnection b NRMInteractive 1 aId True cReq "bob's connInfo" PQSupportOn SMSubscribe
BROKER srv' (NETWORK _) <- runLeft $ A.joinConnection b NRMInteractive 1 aId True cReq "bob's connInfo" PQSupportOn SMSubscribe
unless (testPort `isSuffixOf` srv') $ putStrLn "retrying create" >> threadDelay 200000 >> loopCreate
loopCreate
restartAgentB restart b [aId]
@@ -1242,12 +1242,12 @@ testContactErrors ps restart = do
("", "", DOWN _ [_]) <- nGet a
aId <- runRight $ A.prepareConnectionToJoin b 1 True cReq PQSupportOn
-- fails to create queue on testPort2
BROKER srv2 NETWORK <- runLeft $ A.joinConnection b NRMInteractive 1 aId True cReq "bob's connInfo" PQSupportOn SMSubscribe
BROKER srv2 (NETWORK _) <- runLeft $ A.joinConnection b NRMInteractive 1 aId True cReq "bob's connInfo" PQSupportOn SMSubscribe
(testPort2 `isSuffixOf` srv2) `shouldBe` True
b' <- restartAgentB restart b [aId]
let loopCreate2 = do
-- creates the reply queue on testPort2, but fails to send invitation to testPort
BROKER srv' NETWORK <- runLeft $ A.joinConnection b' NRMInteractive 1 aId True cReq "bob's connInfo" PQSupportOn SMSubscribe
BROKER srv' (NETWORK _) <- runLeft $ A.joinConnection b' NRMInteractive 1 aId True cReq "bob's connInfo" PQSupportOn SMSubscribe
unless (testPort `isSuffixOf` srv') $ putStrLn "retrying create 2" >> threadDelay 200000 >> loopCreate2
b'' <- withServer2 ps $ do
loopCreate2
@@ -1270,7 +1270,7 @@ testContactErrors ps restart = do
("", "", UP _ [_]) <- nGet b''
let loopSecure = do
-- secures the queue on testPort2, but fails to create reply queue on testPort
BROKER srv NETWORK <- runLeft $ acceptContact a 1 bId True invId "alice's connInfo" PQSupportOn SMSubscribe
BROKER srv (NETWORK _) <- runLeft $ acceptContact a 1 bId True invId "alice's connInfo" PQSupportOn SMSubscribe
unless (testPort `isSuffixOf` srv) $ putStrLn "retrying secure" >> threadDelay 200000 >> loopSecure
loopSecure
("", "", DOWN _ [_]) <- nGet b''
@@ -1278,7 +1278,7 @@ testContactErrors ps restart = do
("", "", UP _ [_]) <- nGet a
let loopCreate = do
-- creates the reply queue on testPort, but fails to send confirmation to testPort2
BROKER srv2' NETWORK <- runLeft $ acceptContact a 1 bId True invId "alice's connInfo" PQSupportOn SMSubscribe
BROKER srv2' (NETWORK _) <- runLeft $ acceptContact a 1 bId True invId "alice's connInfo" PQSupportOn SMSubscribe
unless (testPort2 `isSuffixOf` srv2') $ putStrLn "retrying create" >> threadDelay 200000 >> loopCreate
loopCreate
restartAgentA restart a [contactId, bId]
@@ -1743,7 +1743,7 @@ testDuplicateMessage ps = do
-- commenting two lines below and uncommenting further two lines would also runRight_,
-- it is the scenario tested above, when the message was not acknowledged by the user
threadDelay 200000
Left (BROKER _ NETWORK) <- runExceptT $ ackMessage bob1 aliceId 3 Nothing
Left (BROKER _ (NETWORK _)) <- runExceptT $ ackMessage bob1 aliceId 3 Nothing
disposeAgentClient alice
disposeAgentClient bob1
@@ -1827,8 +1827,8 @@ testDeliveryAfterSubscriptionError ps = do
pure (aId, bId)
withAgentClients2 $ \a b -> do
Left (BROKER _ NETWORK) <- runExceptT $ subscribeConnection a bId
Left (BROKER _ NETWORK) <- runExceptT $ subscribeConnection b aId
Left (BROKER _ (NETWORK _)) <- runExceptT $ subscribeConnection a bId
Left (BROKER _ (NETWORK _)) <- runExceptT $ subscribeConnection b aId
withSmpServerStoreLogOn ps testPort $ \_ -> runRight $ do
withUP a bId $ \case ("", c, SENT 2) -> c == bId; _ -> False
withUP b aId $ \case ("", c, Msg "hello") -> c == aId; _ -> False
@@ -1872,7 +1872,7 @@ testExpireMessage ps =
2 <- runRight $ sendMessage a bId SMP.noMsgFlags "1"
threadDelay 1500000
3 <- runRight $ sendMessage a bId SMP.noMsgFlags "2" -- this won't expire
get a =##> \case ("", c, MERR 2 (BROKER _ e)) -> bId == c && (e == TIMEOUT || e == NETWORK); _ -> False
get a =##> \case ("", c, MERR 2 (BROKER _ e)) -> bId == c && networkOrTimeoutError e; _ -> False
withSmpServerStoreLogOn ps testPort $ \_ -> runRight_ $ do
withUP a bId $ \case ("", _, SENT 3) -> True; _ -> False
withUP b aId $ \case ("", _, MsgErr 2 (MsgSkipped 2 2) "2") -> True; _ -> False
@@ -1891,8 +1891,8 @@ testExpireManyMessages ps =
4 <- sendMessage a bId SMP.noMsgFlags "3"
liftIO $ threadDelay 2000000
5 <- sendMessage a bId SMP.noMsgFlags "4" -- this won't expire
get a =##> \case ("", c, MERR 2 (BROKER _ e)) -> bId == c && (e == TIMEOUT || e == NETWORK); _ -> False
let expected c e = bId == c && (e == TIMEOUT || e == NETWORK)
get a =##> \case ("", c, MERR 2 (BROKER _ e)) -> bId == c && networkOrTimeoutError e; _ -> False
let expected c e = bId == c && networkOrTimeoutError e
get a >>= \case
("", c, MERR 3 (BROKER _ e)) -> do
liftIO $ expected c e `shouldBe` True
@@ -2633,7 +2633,7 @@ testDeleteConnectionAsync ps =
runRight_ $ do
deleteConnectionsAsync a False connIds
nGet a =##> \case ("", "", DOWN {}) -> True; _ -> False
let delOk = \case (c, _, _, Just (BROKER _ e)) -> c `elem` connIds && (e == TIMEOUT || e == NETWORK); _ -> False
let delOk = \case (c, _, _, Just (BROKER _ e)) -> c `elem` connIds && networkOrTimeoutError e; _ -> False
get a =##> \case ("", "", DEL_RCVQS rs) -> length rs == 3 && all delOk rs; _ -> False
get a =##> \case ("", "", DEL_CONNS cs) -> length cs == 3 && all (`elem` connIds) cs; _ -> False
liftIO $ noMessages a "nothing else should be delivered to alice"
@@ -2691,7 +2691,7 @@ testWaitDelivery ps =
3 <- msgId <$> sendMessage alice bobId SMP.noMsgFlags "how are you?"
4 <- msgId <$> sendMessage alice bobId SMP.noMsgFlags "message 1"
deleteConnectionsAsync alice True [bobId]
get alice =##> \case ("", "", DEL_RCVQS [(cId, _, _, Just (BROKER _ e))]) -> cId == bobId && (e == TIMEOUT || e == NETWORK); _ -> False
get alice =##> \case ("", "", DEL_RCVQS [(cId, _, _, Just (BROKER _ e))]) -> cId == bobId && networkOrTimeoutError e; _ -> False
liftIO $ noMessages alice "nothing else should be delivered to alice"
liftIO $ noMessages bob "nothing else should be delivered to bob"
@@ -2748,7 +2748,7 @@ testWaitDeliveryAUTHErr ps =
3 <- msgId <$> sendMessage alice bobId SMP.noMsgFlags "how are you?"
4 <- msgId <$> sendMessage alice bobId SMP.noMsgFlags "message 1"
deleteConnectionsAsync alice True [bobId]
get alice =##> \case ("", "", DEL_RCVQS [(cId, _, _, Just (BROKER _ e))]) -> cId == bobId && (e == TIMEOUT || e == NETWORK); _ -> False
get alice =##> \case ("", "", DEL_RCVQS [(cId, _, _, Just (BROKER _ e))]) -> cId == bobId && networkOrTimeoutError e; _ -> False
liftIO $ noMessages alice "nothing else should be delivered to alice"
liftIO $ noMessages bob "nothing else should be delivered to bob"
@@ -2788,7 +2788,7 @@ testWaitDeliveryTimeout ps =
3 <- msgId <$> sendMessage alice bobId SMP.noMsgFlags "how are you?"
4 <- msgId <$> sendMessage alice bobId SMP.noMsgFlags "message 1"
deleteConnectionsAsync alice True [bobId]
get alice =##> \case ("", "", DEL_RCVQS [(cId, _, _, Just (BROKER _ e))]) -> cId == bobId && (e == TIMEOUT || e == NETWORK); _ -> False
get alice =##> \case ("", "", DEL_RCVQS [(cId, _, _, Just (BROKER _ e))]) -> cId == bobId && networkOrTimeoutError e; _ -> False
get alice =##> \case ("", "", DEL_CONNS [cId]) -> cId == bobId; _ -> False
liftIO $ noMessages alice "nothing else should be delivered to alice"
liftIO $ noMessages bob "nothing else should be delivered to bob"
@@ -2828,7 +2828,7 @@ testWaitDeliveryTimeout2 ps =
3 <- msgId <$> sendMessage alice bobId SMP.noMsgFlags "how are you?"
4 <- msgId <$> sendMessage alice bobId SMP.noMsgFlags "message 1"
deleteConnectionsAsync alice True [bobId]
get alice =##> \case ("", "", DEL_RCVQS [(cId, _, _, Just (BROKER _ e))]) -> cId == bobId && (e == TIMEOUT || e == NETWORK); _ -> False
get alice =##> \case ("", "", DEL_RCVQS [(cId, _, _, Just (BROKER _ e))]) -> cId == bobId && networkOrTimeoutError e; _ -> False
get alice =##> \case ("", "", DEL_CONNS [cId]) -> cId == bobId; _ -> False
liftIO $ noMessages alice "nothing else should be delivered to alice"
liftIO $ noMessages bob "nothing else should be delivered to bob"
@@ -2849,6 +2849,12 @@ testWaitDeliveryTimeout2 ps =
baseId = 1
msgId = subtract baseId
networkOrTimeoutError :: BrokerErrorType -> Bool
networkOrTimeoutError = \case
TIMEOUT -> True
NETWORK _ -> True
_ -> False
testJoinConnectionAsyncReplyErrorV8 :: HasCallStack => (ASrvTransport, AStoreType) -> IO ()
testJoinConnectionAsyncReplyErrorV8 ps@(t, ASType qsType _) = do
let initAgentServersSrv2 = initAgentServers {smp = userServers [testSMPServer2]}
@@ -2975,7 +2981,7 @@ testUsersNoServer ps = withAgentClientsCfg2 aCfg agentCfg $ \a b -> do
nGet b =##> \case ("", "", DOWN _ cs) -> length cs == 2; _ -> False
runRight_ $ do
deleteUser a auId True
get a =##> \case ("", "", DEL_RCVQS [(c, _, _, Just (BROKER _ e))]) -> c == bId' && (e == TIMEOUT || e == NETWORK); _ -> False
get a =##> \case ("", "", DEL_RCVQS [(c, _, _, Just (BROKER _ e))]) -> c == bId' && networkOrTimeoutError e;; _ -> False
get a =##> \case ("", "", DEL_CONNS [c]) -> c == bId'; _ -> False
nGet a =##> \case ("", "", DEL_USER u) -> u == auId; _ -> False
liftIO $ noMessages a "nothing else should be delivered to alice"
@@ -3639,7 +3645,7 @@ testServerMultipleIdentities =
exchangeGreetings alice bobId bob aliceId
-- this saves queue with second server identity
bob' <- liftIO $ do
Left (BROKER _ NETWORK) <- runExceptT $ joinConnection bob 1 True secondIdentityCReq "bob's connInfo" SMSubscribe
Left (BROKER _ (NETWORK _)) <- runExceptT $ joinConnection bob 1 True secondIdentityCReq "bob's connInfo" SMSubscribe
disposeAgentClient bob
threadDelay 250000
getSMPAgentClient' 3 agentCfg initAgentServers testDB2

View File

@@ -79,7 +79,7 @@ import Simplex.Messaging.Notifications.Server.Push.APNS
import Simplex.Messaging.Notifications.Server.Store.Postgres (closeNtfDbStore, newNtfDbStore, withDB')
import Simplex.Messaging.Notifications.Types (NtfTknAction (..), NtfToken (..))
import Simplex.Messaging.Parsers (parseAll)
import Simplex.Messaging.Protocol (ErrorType (AUTH), MsgFlags (MsgFlags), NMsgMeta (..), NtfServer, ProtocolServer (..), SMPMsgMeta (..), SubscriptionMode (..))
import Simplex.Messaging.Protocol (ErrorType (AUTH), NetworkError (..), MsgFlags (MsgFlags), NMsgMeta (..), NtfServer, ProtocolServer (..), SMPMsgMeta (..), SubscriptionMode (..))
import qualified Simplex.Messaging.Protocol as SMP
import Simplex.Messaging.Server.Env.STM (AStoreType (..), ServerConfig (..))
import Simplex.Messaging.Transport (ASrvTransport)
@@ -137,7 +137,7 @@ notificationTests ps@(t, _) = do
it "should pass" $ testRunNTFServerTests t testNtfServer `shouldReturn` Nothing
let srv1 = testNtfServer {keyHash = "1234"}
it "should fail with incorrect fingerprint" $ do
testRunNTFServerTests t srv1 `shouldReturn` Just (ProtocolTestFailure TSConnect $ BROKER (B.unpack $ strEncode srv1) NETWORK)
testRunNTFServerTests t srv1 `shouldReturn` Just (ProtocolTestFailure TSConnect $ BROKER (B.unpack $ strEncode srv1) $ NETWORK NEUnknownCAError)
describe "Managing notification subscriptions" $ do
describe "should create notification subscription for existing connection" $
testNtfMatrix ps testNotificationSubscriptionExistingConnection
@@ -321,7 +321,7 @@ testNtfTokenServerRestartReverify t apns = do
runRight_ $ do
verification <- ntfData .-> "verification"
nonce <- C.cbNonce <$> ntfData .-> "nonce"
Left (BROKER _ NETWORK) <- tryE $ verifyNtfToken a tkn nonce verification
Left (BROKER _ (NETWORK _)) <- tryE $ verifyNtfToken a tkn nonce verification
pure ()
threadDelay 1500000
withAgent 2 agentCfg initAgentServers testDB $ \a' ->
@@ -478,7 +478,7 @@ testNtfTokenChangeServers t apns =
tkn2 <- registerTestToken a "xyzw" NMInstant apns
getTestNtfTokenPort a >>= \port -> liftIO $ port `shouldBe` ntfTestPort -- not yet changed
deleteNtfToken a tkn2 -- force server switch
Left BROKER {brokerErr = NETWORK} <- tryError $ registerTestToken a "qwer" NMInstant apns -- ok, it's down for now
Left BROKER {brokerErr = (NETWORK _)} <- tryError $ registerTestToken a "qwer" NMInstant apns -- ok, it's down for now
getTestNtfTokenPort a >>= \port2 -> liftIO $ port2 `shouldBe` ntfTestPort2 -- but the token got updated
killThread ntf
withNtfServerOn t ntfTestPort2 ntfTestDBCfg2 $ runRight_ $ do

View File

@@ -38,7 +38,7 @@ import qualified Simplex.Messaging.Crypto as C
import Simplex.Messaging.Crypto.File (CryptoFile (..), CryptoFileArgs)
import qualified Simplex.Messaging.Crypto.File as CF
import Simplex.Messaging.Encoding.String (StrEncoding (..))
import Simplex.Messaging.Protocol (BasicAuth, ProtoServerWithAuth (..), ProtocolServer (..), XFTPServerWithAuth)
import Simplex.Messaging.Protocol (BasicAuth, NetworkError (..), ProtoServerWithAuth (..), ProtocolServer (..), XFTPServerWithAuth)
import Simplex.Messaging.Server.Expiration (ExpirationConfig (..))
import Simplex.Messaging.Util (tshow)
import System.Directory (doesDirectoryExist, doesFileExist, getFileSize, listDirectory, removeFile)
@@ -84,7 +84,7 @@ xftpAgentTests =
it "should pass without basic auth" $ testXFTPServerTest Nothing (noAuthSrv testXFTPServer2) `shouldReturn` Nothing
let srv1 = testXFTPServer2 {keyHash = "1234"}
it "should fail with incorrect fingerprint" $ do
testXFTPServerTest Nothing (noAuthSrv srv1) `shouldReturn` Just (ProtocolTestFailure TSConnect $ BROKER (B.unpack $ strEncode srv1) NETWORK)
testXFTPServerTest Nothing (noAuthSrv srv1) `shouldReturn` Just (ProtocolTestFailure TSConnect $ BROKER (B.unpack $ strEncode srv1) $ NETWORK NEUnknownCAError)
describe "server with password" $ do
let auth = Just "abcd"
srv = ProtoServerWithAuth testXFTPServer2