mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-04-01 03:05:57 +00:00
Merge branch 'master' into ab/async-subs
This commit is contained in:
@@ -1,5 +1,5 @@
|
||||
name: simplexmq
|
||||
version: 5.8.0.9
|
||||
version: 5.8.0.10
|
||||
synopsis: SimpleXMQ message broker
|
||||
description: |
|
||||
This package includes <./docs/Simplex-Messaging-Server.html server>,
|
||||
|
||||
@@ -5,7 +5,7 @@ cabal-version: 1.12
|
||||
-- see: https://github.com/sol/hpack
|
||||
|
||||
name: simplexmq
|
||||
version: 5.8.0.9
|
||||
version: 5.8.0.10
|
||||
synopsis: SimpleXMQ message broker
|
||||
description: This package includes <./docs/Simplex-Messaging-Server.html server>,
|
||||
<./docs/Simplex-Messaging-Client.html client> and
|
||||
|
||||
@@ -98,12 +98,6 @@ defaultXFTPClientConfig =
|
||||
clientALPN = Just supportedXFTPhandshakes
|
||||
}
|
||||
|
||||
http2XFTPClientError :: HTTP2ClientError -> XFTPClientError
|
||||
http2XFTPClientError = \case
|
||||
HCResponseTimeout -> PCEResponseTimeout
|
||||
HCNetworkError -> PCENetworkError
|
||||
HCIOError e -> PCEIOError e
|
||||
|
||||
getXFTPClient :: TransportSession FileResponse -> XFTPClientConfig -> (XFTPClient -> IO ()) -> IO (Either XFTPClientError XFTPClient)
|
||||
getXFTPClient transportSession@(_, srv, _) config@XFTPClientConfig {clientALPN, xftpNetworkConfig, serverVRange} disconnected = runExceptT $ do
|
||||
let username = proxyUsername transportSession
|
||||
@@ -140,7 +134,7 @@ xftpClientHandshakeV1 serverVRange keyHash@(C.KeyHash kh) c@HTTP2Client {session
|
||||
getServerHandshake = do
|
||||
let helloReq = H.requestNoBody "POST" "/" []
|
||||
HTTP2Response {respBody = HTTP2Body {bodyHead = shsBody}} <-
|
||||
liftError' http2XFTPClientError $ sendRequest c helloReq Nothing
|
||||
liftError' xftpClientError $ sendRequest c helloReq Nothing
|
||||
liftTransportErr (TEHandshake PARSE) . smpDecode =<< liftTransportErr TEBadBlock (C.unPad shsBody)
|
||||
processServerHandshake :: XFTPServerHandshake -> ExceptT XFTPClientError IO (VersionRangeXFTP, C.PublicKeyX25519)
|
||||
processServerHandshake XFTPServerHandshake {xftpVersionRange, sessionId = serverSessId, authPubKey = serverAuth} = do
|
||||
@@ -159,7 +153,7 @@ xftpClientHandshakeV1 serverVRange keyHash@(C.KeyHash kh) c@HTTP2Client {session
|
||||
sendClientHandshake chs = do
|
||||
chs' <- liftTransportErr TELargeMsg $ C.pad (smpEncode chs) xftpBlockSize
|
||||
let chsReq = H.requestBuilder "POST" "/" [] $ byteString chs'
|
||||
HTTP2Response {respBody = HTTP2Body {bodyHead}} <- liftError' http2XFTPClientError $ sendRequest c chsReq Nothing
|
||||
HTTP2Response {respBody = HTTP2Body {bodyHead}} <- liftError' xftpClientError $ sendRequest c chsReq Nothing
|
||||
unless (B.null bodyHead) $ throwError $ PCETransportError TEBadBlock
|
||||
liftTransportErr e = liftEitherWith (const $ PCETransportError e)
|
||||
|
||||
|
||||
@@ -479,7 +479,7 @@ processXFTPRequest HTTP2Body {bodyPart} = \case
|
||||
pure $ FRErr e
|
||||
receiveChunk spec = do
|
||||
t <- asks $ fileTimeout . config
|
||||
liftIO $ fromMaybe (Left TIMEOUT) <$> timeout t (runExceptT (receiveFile getBody spec) `catchAll_` pure (Left FILE_IO))
|
||||
liftIO $ fromMaybe (Left TIMEOUT) <$> timeout t (runExceptT $ receiveFile getBody spec)
|
||||
sendServerFile :: FileRec -> RcvPublicDhKey -> M (FileResponse, Maybe ServerFile)
|
||||
sendServerFile FileRec {senderId, filePath, fileInfo = FileInfo {size}} rDhKey = do
|
||||
readTVarIO filePath >>= \case
|
||||
|
||||
@@ -166,7 +166,7 @@ xftpServerCLI cfgPath logPath = do
|
||||
defaultFileExpiration
|
||||
{ ttl = 3600 * readIniDefault defFileExpirationHours "STORE_LOG" "expire_files_hours" ini
|
||||
},
|
||||
fileTimeout = 10 * 60 * 1000000, -- 10 mins to send 4mb chunk
|
||||
fileTimeout = 5 * 60 * 1000000, -- 5 mins to send 4mb chunk
|
||||
inactiveClientExpiration =
|
||||
settingIsOn "INACTIVE_CLIENTS" "disconnect" ini
|
||||
$> ExpirationConfig
|
||||
|
||||
@@ -34,6 +34,7 @@ where
|
||||
|
||||
import Control.Applicative ((<|>))
|
||||
import qualified Control.Exception as E
|
||||
import Control.Logger.Simple
|
||||
import Control.Monad
|
||||
import Control.Monad.Except
|
||||
import Control.Monad.IO.Class
|
||||
@@ -46,8 +47,10 @@ import Data.ByteString.Builder (Builder, byteString)
|
||||
import Data.ByteString.Char8 (ByteString)
|
||||
import qualified Data.ByteString.Char8 as B
|
||||
import qualified Data.ByteString.Lazy.Char8 as LB
|
||||
import Data.Functor (($>))
|
||||
import Data.Word (Word16, Word32)
|
||||
import qualified Data.X509 as X
|
||||
import Network.HTTP2.Client (HTTP2Error)
|
||||
import qualified Simplex.Messaging.Crypto as C
|
||||
import qualified Simplex.Messaging.Crypto.Lazy as LC
|
||||
import Simplex.Messaging.Encoding
|
||||
@@ -56,7 +59,7 @@ import Simplex.Messaging.Parsers
|
||||
import Simplex.Messaging.Protocol (CommandError)
|
||||
import Simplex.Messaging.Transport (SessionId, THandle (..), THandleParams (..), TransportError (..), TransportPeer (..))
|
||||
import Simplex.Messaging.Transport.HTTP2.File
|
||||
import Simplex.Messaging.Util (bshow)
|
||||
import Simplex.Messaging.Util (bshow, tshow)
|
||||
import Simplex.Messaging.Version
|
||||
import Simplex.Messaging.Version.Internal
|
||||
import System.IO (Handle, IOMode (..), withFile)
|
||||
@@ -145,9 +148,14 @@ sendEncFile h send = go
|
||||
go sbState' $ sz - fromIntegral (B.length ch)
|
||||
|
||||
receiveFile :: (Int -> IO ByteString) -> XFTPRcvChunkSpec -> ExceptT XFTPErrorType IO ()
|
||||
receiveFile getBody = receiveFile_ receive
|
||||
receiveFile getBody chunk = ExceptT $ runExceptT (receiveFile_ receive chunk) `E.catches` handlers
|
||||
where
|
||||
receive h sz = hReceiveFile getBody h sz >>= \sz' -> pure $ if sz' == 0 then Right () else Left SIZE
|
||||
handlers =
|
||||
[ E.Handler $ \(e :: HTTP2Error) -> logWarn (err e) $> Left TIMEOUT,
|
||||
E.Handler $ \(e :: E.SomeException) -> logError (err e) $> Left FILE_IO
|
||||
]
|
||||
err e = "receiveFile error: " <> tshow e
|
||||
|
||||
receiveEncFile :: (Int -> IO ByteString) -> LC.SbState -> XFTPRcvChunkSpec -> ExceptT XFTPErrorType IO ()
|
||||
receiveEncFile getBody = receiveFile_ . receive
|
||||
@@ -213,7 +221,7 @@ data XFTPErrorType
|
||||
HAS_FILE
|
||||
| -- | file IO error
|
||||
FILE_IO
|
||||
| -- | file sending timeout
|
||||
| -- | file sending or receiving timeout
|
||||
TIMEOUT
|
||||
| -- | bad redirect data
|
||||
REDIRECT {redirectError :: String}
|
||||
|
||||
@@ -188,6 +188,7 @@ import qualified Simplex.FileTransfer.Client as X
|
||||
import Simplex.FileTransfer.Description (ChunkReplicaId (..), FileDigest (..), kb)
|
||||
import Simplex.FileTransfer.Protocol (FileInfo (..), FileResponse)
|
||||
import Simplex.FileTransfer.Transport (XFTPErrorType (DIGEST), XFTPRcvChunkSpec (..), XFTPVersion)
|
||||
import qualified Simplex.FileTransfer.Transport as XFTP
|
||||
import Simplex.FileTransfer.Types (DeletedSndChunkReplica (..), NewSndChunkReplica (..), RcvFileChunkReplica (..), SndFileChunk (..), SndFileChunkReplica (..))
|
||||
import Simplex.FileTransfer.Util (uniqueCombine)
|
||||
import Simplex.Messaging.Agent.Env.SQLite
|
||||
@@ -1307,6 +1308,7 @@ temporaryAgentError :: AgentErrorType -> Bool
|
||||
temporaryAgentError = \case
|
||||
BROKER _ e -> tempBrokerError e
|
||||
SMP _ (SMP.PROXY (SMP.BROKER e)) -> tempBrokerError e
|
||||
XFTP _ XFTP.TIMEOUT -> True
|
||||
PROXY _ _ (ProxyProtocolError (SMP.PROXY (SMP.BROKER e))) -> tempBrokerError e
|
||||
PROXY _ _ (ProxyProtocolError (SMP.PROXY SMP.NO_SESSION)) -> True
|
||||
INACTIVE -> True
|
||||
@@ -2077,12 +2079,13 @@ getAgentQueuesInfo AgentClient {msgQ, subQ, smpClients} = do
|
||||
where
|
||||
getClientQueuesInfo :: SMPClientVar -> IO (Int, UTCTime, ClientInfo)
|
||||
getClientQueuesInfo SessionVar {sessionVar, sessionVarId, sessionVarTs} = do
|
||||
clientInfo <- atomically (tryReadTMVar sessionVar) >>= \case
|
||||
Just (Right c) -> do
|
||||
(sndQInfo, rcvQInfo) <- getProtocolClientQueuesInfo $ protocolClient c
|
||||
pure ClientInfoQueues {sndQInfo, rcvQInfo}
|
||||
Just (Left e) -> pure $ ClientInfoError e
|
||||
Nothing -> pure ClientInfoConnecting
|
||||
clientInfo <-
|
||||
atomically (tryReadTMVar sessionVar) >>= \case
|
||||
Just (Right c) -> do
|
||||
(sndQInfo, rcvQInfo) <- getProtocolClientQueuesInfo $ protocolClient c
|
||||
pure ClientInfoQueues {sndQInfo, rcvQInfo}
|
||||
Just (Left e) -> pure $ ClientInfoError e
|
||||
Nothing -> pure ClientInfoConnecting
|
||||
pure (sessionVarId, sessionVarTs, clientInfo)
|
||||
|
||||
$(J.deriveJSON defaultJSON ''AgentLocks)
|
||||
|
||||
Reference in New Issue
Block a user