mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-05-24 19:05:24 +00:00
Merge branch 'master' into rcv-services
This commit is contained in:
@@ -47,7 +47,7 @@ import Data.Map.Strict (Map)
|
||||
import qualified Data.Map.Strict as M
|
||||
import Data.Maybe (fromMaybe, mapMaybe)
|
||||
import qualified Data.Set as S
|
||||
import Data.Text (Text)
|
||||
import Data.Text (Text, pack)
|
||||
import Data.Time.Clock (getCurrentTime)
|
||||
import Data.Time.Format (defaultTimeLocale, formatTime)
|
||||
import Simplex.FileTransfer.Chunks (toKB)
|
||||
@@ -433,7 +433,7 @@ runXFTPSndPrepareWorker c Worker {doWork} = do
|
||||
encryptFileForUpload :: SndFile -> FilePath -> AM (FileDigest, [(XFTPChunkSpec, FileDigest)])
|
||||
encryptFileForUpload SndFile {key, nonce, srcFile, redirect} fsEncPath = do
|
||||
let CryptoFile {filePath} = srcFile
|
||||
fileName = takeFileName filePath
|
||||
fileName = pack $ takeFileName filePath
|
||||
fileSize <- liftIO $ fromInteger <$> CF.getFileContentsSize srcFile
|
||||
when (fileSize > maxFileSizeHard) $ throwE $ FILE FT.SIZE
|
||||
let fileHdr = smpEncode FileHeader {fileName, fileExtra = Nothing}
|
||||
|
||||
@@ -1,4 +1,14 @@
|
||||
module Simplex.FileTransfer.Chunks where
|
||||
module Simplex.FileTransfer.Chunks
|
||||
( serverChunkSizes,
|
||||
chunkSize0,
|
||||
chunkSize1,
|
||||
chunkSize2,
|
||||
chunkSize3,
|
||||
kb,
|
||||
toKB,
|
||||
mb,
|
||||
gb,
|
||||
) where
|
||||
|
||||
import Data.Word (Word32)
|
||||
|
||||
|
||||
@@ -9,7 +9,28 @@
|
||||
{-# LANGUAGE TupleSections #-}
|
||||
{-# LANGUAGE TypeApplications #-}
|
||||
|
||||
module Simplex.FileTransfer.Client where
|
||||
module Simplex.FileTransfer.Client
|
||||
( XFTPClient (..),
|
||||
XFTPClientConfig (..),
|
||||
XFTPChunkSpec (..),
|
||||
XFTPClientError,
|
||||
defaultXFTPClientConfig,
|
||||
getXFTPClient,
|
||||
closeXFTPClient,
|
||||
xftpClientServer,
|
||||
xftpTransportHost,
|
||||
createXFTPChunk,
|
||||
addXFTPRecipients,
|
||||
uploadXFTPChunk,
|
||||
downloadXFTPChunk,
|
||||
deleteXFTPChunk,
|
||||
ackXFTPChunk,
|
||||
pingXFTP,
|
||||
singleChunkSize,
|
||||
prepareChunkSizes,
|
||||
prepareChunkSpecs,
|
||||
getChunkDigest,
|
||||
) where
|
||||
|
||||
import qualified Control.Exception as E
|
||||
import Control.Logger.Simple
|
||||
@@ -41,11 +62,11 @@ import Simplex.Messaging.Client
|
||||
NetworkRequestMode (..),
|
||||
ProtocolClientError (..),
|
||||
TransportSession,
|
||||
netTimeoutInt,
|
||||
chooseTransportHost,
|
||||
defaultNetworkConfig,
|
||||
transportClientConfig,
|
||||
clientSocksCredentials,
|
||||
defaultNetworkConfig,
|
||||
netTimeoutInt,
|
||||
transportClientConfig,
|
||||
unexpectedResponse,
|
||||
clientHandlers,
|
||||
useWebPort,
|
||||
@@ -56,12 +77,12 @@ import Simplex.Messaging.Encoding (smpDecode, smpEncode)
|
||||
import Simplex.Messaging.Encoding.String
|
||||
import Simplex.Messaging.Protocol
|
||||
( BasicAuth,
|
||||
NetworkError (..),
|
||||
Protocol (..),
|
||||
ProtocolServer (..),
|
||||
RecipientId,
|
||||
SenderId,
|
||||
pattern NoEntity,
|
||||
NetworkError (..),
|
||||
)
|
||||
import Simplex.Messaging.Transport (ALPN, CertChainPubKey (..), HandshakeError (..), THandleAuth (..), THandleParams (..), TransportError (..), TransportPeer (..), defaultSupportedParams)
|
||||
import Simplex.Messaging.Transport.Client (TransportClientConfig (..), TransportHost)
|
||||
@@ -129,8 +150,9 @@ getXFTPClient transportSession@(_, srv, _) config@XFTPClientConfig {clientALPN,
|
||||
thParams0 = THandleParams {sessionId, blockSize = xftpBlockSize, thVersion = v, thServerVRange, thAuth = Nothing, implySessId = False, encryptBlock = Nothing, batch = True, serviceAuth = False}
|
||||
logDebug $ "Client negotiated handshake protocol: " <> tshow sessionALPN
|
||||
thParams@THandleParams {thVersion} <- case sessionALPN of
|
||||
Just alpn | alpn == xftpALPNv1 || alpn == httpALPN11 ->
|
||||
xftpClientHandshakeV1 serverVRange keyHash http2Client thParams0
|
||||
Just alpn
|
||||
| alpn == xftpALPNv1 || alpn == httpALPN11 ->
|
||||
xftpClientHandshakeV1 serverVRange keyHash http2Client thParams0
|
||||
_ -> pure thParams0
|
||||
logDebug $ "Client negotiated protocol: " <> tshow thVersion
|
||||
let c = XFTPClient {http2Client, thParams, transportSession, config}
|
||||
@@ -215,7 +237,7 @@ sendXFTPTransmission XFTPClient {config, thParams, http2Client} t chunkSpec_ = d
|
||||
HTTP2Response {respBody = body@HTTP2Body {bodyHead}} <- withExceptT xftpClientError . ExceptT $ sendRequest http2Client req (Just reqTimeout)
|
||||
when (B.length bodyHead /= xftpBlockSize) $ throwE $ PCEResponseError BLOCK
|
||||
-- TODO validate that the file ID is the same as in the request?
|
||||
(_, _fId, respOrErr) <-liftEither $ first PCEResponseError $ xftpDecodeTClient thParams bodyHead
|
||||
(_, _fId, respOrErr) <- liftEither $ first PCEResponseError $ xftpDecodeTClient thParams bodyHead
|
||||
case respOrErr of
|
||||
Right r -> case protocolError r of
|
||||
Just e -> throwE $ PCEProtocolError e
|
||||
|
||||
@@ -5,7 +5,17 @@
|
||||
{-# LANGUAGE NumericUnderscores #-}
|
||||
{-# LANGUAGE OverloadedStrings #-}
|
||||
|
||||
module Simplex.FileTransfer.Client.Agent where
|
||||
module Simplex.FileTransfer.Client.Agent
|
||||
( XFTPClientVar,
|
||||
XFTPClientAgent (..),
|
||||
XFTPClientAgentConfig (..),
|
||||
XFTPClientAgentError (..),
|
||||
defaultXFTPClientAgentConfig,
|
||||
newXFTPAgent,
|
||||
getXFTPServerClient,
|
||||
showServer,
|
||||
closeXFTPServerClient,
|
||||
) where
|
||||
|
||||
import Control.Logger.Simple (logInfo)
|
||||
import Control.Monad
|
||||
|
||||
@@ -16,6 +16,9 @@ module Simplex.FileTransfer.Client.Main
|
||||
xftpClientCLI,
|
||||
cliSendFile,
|
||||
cliSendFileOpts,
|
||||
encodeWebURI,
|
||||
decodeWebURI,
|
||||
fileWebLink,
|
||||
singleChunkSize,
|
||||
prepareChunkSizes,
|
||||
prepareChunkSpecs,
|
||||
@@ -23,6 +26,7 @@ module Simplex.FileTransfer.Client.Main
|
||||
)
|
||||
where
|
||||
|
||||
import qualified Codec.Compression.Zlib.Raw as Z
|
||||
import Control.Logger.Simple
|
||||
import Control.Monad
|
||||
import Control.Monad.Except
|
||||
@@ -30,17 +34,19 @@ import Control.Monad.Trans.Except
|
||||
import Crypto.Random (ChaChaDRG)
|
||||
import qualified Data.Attoparsec.ByteString.Char8 as A
|
||||
import Data.Bifunctor (first)
|
||||
import qualified Data.ByteString.Base64.URL as U
|
||||
import qualified Data.ByteString.Char8 as B
|
||||
import qualified Data.ByteString.Lazy.Char8 as LB
|
||||
import Data.Char (toLower)
|
||||
import Data.Either (partitionEithers)
|
||||
import Data.Int (Int64)
|
||||
import Data.List (foldl', sortOn)
|
||||
import Data.List (foldl', isPrefixOf, sortOn)
|
||||
import Data.List.NonEmpty (NonEmpty (..), nonEmpty)
|
||||
import qualified Data.List.NonEmpty as L
|
||||
import Data.Map.Strict (Map)
|
||||
import qualified Data.Map.Strict as M
|
||||
import Data.Maybe (fromMaybe)
|
||||
import Data.Text (Text)
|
||||
import qualified Data.Text as T
|
||||
import Data.Word (Word32)
|
||||
import GHC.Records (HasField (getField))
|
||||
@@ -62,7 +68,7 @@ import qualified Simplex.Messaging.Crypto.Lazy as LC
|
||||
import Simplex.Messaging.Encoding
|
||||
import Simplex.Messaging.Encoding.String (StrEncoding (..))
|
||||
import Simplex.Messaging.Parsers (parseAll)
|
||||
import Simplex.Messaging.Protocol (ProtoServerWithAuth (..), SenderId, SndPrivateAuthKey, XFTPServer, XFTPServerWithAuth)
|
||||
import Simplex.Messaging.Protocol (ProtoServerWithAuth (..), ProtocolServer (..), SenderId, SndPrivateAuthKey, XFTPServer, XFTPServerWithAuth)
|
||||
import Simplex.Messaging.Server.CLI (getCliCommand')
|
||||
import Simplex.Messaging.Util (groupAllOn, ifM, tshow, whenM)
|
||||
import System.Exit (exitFailure)
|
||||
@@ -242,7 +248,8 @@ cliSendFile opts = cliSendFileOpts opts True $ printProgress "Uploaded"
|
||||
|
||||
cliSendFileOpts :: SendOptions -> Bool -> (Int64 -> Int64 -> IO ()) -> ExceptT CLIError IO ()
|
||||
cliSendFileOpts SendOptions {filePath, outputDir, numRecipients, xftpServers, retryCount, tempPath, verbose} printInfo notifyProgress = do
|
||||
let (_, fileName) = splitFileName filePath
|
||||
let (_, fileNameStr) = splitFileName filePath
|
||||
fileName = T.pack fileNameStr
|
||||
liftIO $ when printInfo $ printNoNewLine "Encrypting file..."
|
||||
g <- liftIO C.newRandom
|
||||
(encPath, fdRcv, fdSnd, chunkSpecs, encSize) <- encryptFileForUpload g fileName
|
||||
@@ -254,14 +261,18 @@ cliSendFileOpts SendOptions {filePath, outputDir, numRecipients, xftpServers, re
|
||||
liftIO $ do
|
||||
let fdRcvs = createRcvFileDescriptions fdRcv sentChunks
|
||||
fdSnd' = createSndFileDescription fdSnd sentChunks
|
||||
(fdRcvPaths, fdSndPath) <- writeFileDescriptions fileName fdRcvs fdSnd'
|
||||
(fdRcvPaths, fdSndPath) <- writeFileDescriptions fileNameStr fdRcvs fdSnd'
|
||||
when printInfo $ do
|
||||
printNoNewLine "File uploaded!"
|
||||
putStrLn $ "\nSender file description: " <> fdSndPath
|
||||
putStrLn "Pass file descriptions to the recipient(s):"
|
||||
forM_ fdRcvPaths putStrLn
|
||||
when printInfo $ case fdRcvs of
|
||||
rcvFd : _ -> forM_ (fileWebLink rcvFd) $ \(host, fragment) ->
|
||||
putStrLn $ "\nWeb link:\nhttps://" <> B.unpack host <> "/#" <> B.unpack fragment
|
||||
_ -> pure ()
|
||||
where
|
||||
encryptFileForUpload :: TVar ChaChaDRG -> String -> ExceptT CLIError IO (FilePath, FileDescription 'FRecipient, FileDescription 'FSender, [XFTPChunkSpec], Int64)
|
||||
encryptFileForUpload :: TVar ChaChaDRG -> Text -> ExceptT CLIError IO (FilePath, FileDescription 'FRecipient, FileDescription 'FSender, [XFTPChunkSpec], Int64)
|
||||
encryptFileForUpload g fileName = do
|
||||
fileSize <- fromInteger <$> getFileSize filePath
|
||||
when (fileSize > maxFileSize) $ throwE $ CLIError $ "Files bigger than " <> maxFileSizeStr <> " are not supported"
|
||||
@@ -387,10 +398,16 @@ cliSendFileOpts SendOptions {filePath, outputDir, numRecipients, xftpServers, re
|
||||
|
||||
cliReceiveFile :: ReceiveOptions -> ExceptT CLIError IO ()
|
||||
cliReceiveFile ReceiveOptions {fileDescription, filePath, retryCount, tempPath, verbose, yes} =
|
||||
getFileDescription' fileDescription >>= receive
|
||||
getInputFileDescription >>= receive 1
|
||||
where
|
||||
receive :: ValidFileDescription 'FRecipient -> ExceptT CLIError IO ()
|
||||
receive (ValidFileDescription FileDescription {size, digest, key, nonce, chunks}) = do
|
||||
getInputFileDescription
|
||||
| "http://" `isPrefixOf` fileDescription || "https://" `isPrefixOf` fileDescription = do
|
||||
let fragment = B.pack $ drop 1 $ dropWhile (/= '#') fileDescription
|
||||
when (B.null fragment) $ throwE $ CLIError "Invalid URL: no fragment"
|
||||
either (throwE . CLIError . ("Invalid web link: " <>)) pure $ decodeWebURI fragment
|
||||
| otherwise = getFileDescription' fileDescription
|
||||
receive :: Int -> ValidFileDescription 'FRecipient -> ExceptT CLIError IO ()
|
||||
receive depth (ValidFileDescription FileDescription {size, digest, key, nonce, chunks, redirect}) = do
|
||||
encPath <- getEncPath tempPath "xftp"
|
||||
createDirectory encPath
|
||||
a <- liftIO $ newXFTPAgent defaultXFTPClientAgentConfig
|
||||
@@ -408,13 +425,26 @@ cliReceiveFile ReceiveOptions {fileDescription, filePath, retryCount, tempPath,
|
||||
when (encDigest /= unFileDigest digest) $ throwE $ CLIError "File digest mismatch"
|
||||
encSize <- liftIO $ foldM (\s path -> (s +) . fromIntegral <$> getFileSize path) 0 chunkPaths
|
||||
when (FileSize encSize /= size) $ throwE $ CLIError "File size mismatch"
|
||||
liftIO $ printNoNewLine "Decrypting file..."
|
||||
CryptoFile path _ <- withExceptT cliCryptoError $ decryptChunks encSize chunkPaths key nonce $ fmap CF.plain . getFilePath
|
||||
forM_ chunks $ acknowledgeFileChunk a
|
||||
whenM (doesPathExist encPath) $ removeDirectoryRecursive encPath
|
||||
liftIO $ do
|
||||
printNoNewLine $ "File downloaded: " <> path
|
||||
removeFD yes fileDescription
|
||||
case redirect of
|
||||
Just _
|
||||
| depth > 0 -> do
|
||||
CryptoFile tmpFile _ <- withExceptT cliCryptoError $ decryptChunks encSize chunkPaths key nonce $ \_ ->
|
||||
fmap CF.plain $ uniqueCombine encPath "redirect.yaml"
|
||||
forM_ chunks $ acknowledgeFileChunk a
|
||||
yaml <- liftIO $ B.readFile tmpFile
|
||||
whenM (doesPathExist encPath) $ removeDirectoryRecursive encPath
|
||||
innerVfd <- either (throwE . CLIError . ("Redirect: invalid file description: " <>)) pure $ strDecode yaml
|
||||
receive 0 innerVfd
|
||||
| otherwise -> throwE $ CLIError "Redirect chain too long"
|
||||
Nothing -> do
|
||||
liftIO $ printNoNewLine "Decrypting file..."
|
||||
CryptoFile path _ <- withExceptT cliCryptoError $ decryptChunks encSize chunkPaths key nonce $ fmap CF.plain . getFilePath
|
||||
forM_ chunks $ acknowledgeFileChunk a
|
||||
whenM (doesPathExist encPath) $ removeDirectoryRecursive encPath
|
||||
liftIO $ do
|
||||
printNoNewLine $ "File downloaded: " <> path
|
||||
unless ("http://" `isPrefixOf` fileDescription || "https://" `isPrefixOf` fileDescription) $
|
||||
removeFD yes fileDescription
|
||||
downloadFileChunk :: TVar ChaChaDRG -> XFTPClientAgent -> FilePath -> FileSize Int64 -> TVar [Int64] -> FileChunk -> ExceptT CLIError IO (Int, FilePath)
|
||||
downloadFileChunk g a encPath (FileSize encSize) downloadedChunks FileChunk {chunkNo, chunkSize, digest, replicas = replica : _} = do
|
||||
let FileChunkReplica {server, replicaId, replicaKey} = replica
|
||||
@@ -430,13 +460,14 @@ cliReceiveFile ReceiveOptions {fileDescription, filePath, retryCount, tempPath,
|
||||
when verbose $ putStrLn ""
|
||||
pure (chunkNo, chunkPath)
|
||||
downloadFileChunk _ _ _ _ _ _ = throwE $ CLIError "chunk has no replicas"
|
||||
getFilePath :: String -> ExceptT String IO FilePath
|
||||
getFilePath name =
|
||||
case filePath of
|
||||
Just path ->
|
||||
ifM (doesDirectoryExist path) (uniqueCombine path name) $
|
||||
ifM (doesFileExist path) (throwE "File already exists") (pure path)
|
||||
_ -> (`uniqueCombine` name) . (</> "Downloads") =<< getHomeDirectory
|
||||
getFilePath :: Text -> ExceptT String IO FilePath
|
||||
getFilePath name = case filePath of
|
||||
Just path ->
|
||||
ifM (doesDirectoryExist path) (uniqueCombine path name') $
|
||||
ifM (doesFileExist path) (throwE "File already exists") (pure path)
|
||||
_ -> (`uniqueCombine` name') . (</> "Downloads") =<< getHomeDirectory
|
||||
where
|
||||
name' = T.unpack name
|
||||
acknowledgeFileChunk :: XFTPClientAgent -> FileChunk -> ExceptT CLIError IO ()
|
||||
acknowledgeFileChunk a FileChunk {replicas = replica : _} = do
|
||||
let FileChunkReplica {server, replicaId, replicaKey} = replica
|
||||
@@ -552,3 +583,24 @@ cliRandomFile RandomFileOptions {filePath, fileSize = FileSize size} = do
|
||||
B.hPut h bytes
|
||||
when (sz > mb') $ saveRandomFile h (sz - mb')
|
||||
mb' = mb 1
|
||||
|
||||
-- | Encode file description as web-compatible URI fragment.
|
||||
-- Result is base64url(deflateRaw(YAML)), no leading '#'.
|
||||
encodeWebURI :: FileDescription 'FRecipient -> B.ByteString
|
||||
encodeWebURI fd = U.encode $ LB.toStrict $ Z.compress $ LB.fromStrict $ strEncode fd
|
||||
|
||||
-- | Decode web URI fragment to validated file description.
|
||||
-- Input is base64url-encoded DEFLATE-compressed YAML, no leading '#'.
|
||||
decodeWebURI :: B.ByteString -> Either String (ValidFileDescription 'FRecipient)
|
||||
decodeWebURI fragment = do
|
||||
compressed <- U.decode fragment
|
||||
let yaml = LB.toStrict $ Z.decompress $ LB.fromStrict compressed
|
||||
strDecode yaml >>= validateFileDescription
|
||||
|
||||
-- | Extract web link host and URI fragment from a file description.
|
||||
-- Returns (hostname, uriFragment) for https://hostname/#uriFragment.
|
||||
fileWebLink :: FileDescription 'FRecipient -> Maybe (B.ByteString, B.ByteString)
|
||||
fileWebLink fd@FileDescription {chunks} = case chunks of
|
||||
(FileChunk {replicas = FileChunkReplica {server = ProtocolServer {host}} : _} : _) ->
|
||||
Just (strEncode (L.head host), encodeWebURI fd)
|
||||
_ -> Nothing
|
||||
|
||||
@@ -1,7 +1,9 @@
|
||||
{-# LANGUAGE OverloadedLists #-}
|
||||
{-# LANGUAGE OverloadedStrings #-}
|
||||
|
||||
module Simplex.FileTransfer.Client.Presets where
|
||||
module Simplex.FileTransfer.Client.Presets
|
||||
( defaultXFTPServers,
|
||||
) where
|
||||
|
||||
import Data.List.NonEmpty (NonEmpty)
|
||||
import Simplex.Messaging.Protocol (XFTPServerWithAuth)
|
||||
|
||||
@@ -4,7 +4,11 @@
|
||||
{-# LANGUAGE OverloadedStrings #-}
|
||||
{-# LANGUAGE ScopedTypeVariables #-}
|
||||
|
||||
module Simplex.FileTransfer.Crypto where
|
||||
module Simplex.FileTransfer.Crypto
|
||||
( encryptFile,
|
||||
decryptChunks,
|
||||
readChunks,
|
||||
) where
|
||||
|
||||
import Control.Monad
|
||||
import Control.Monad.Except
|
||||
@@ -16,6 +20,7 @@ import Data.ByteString.Char8 (ByteString)
|
||||
import qualified Data.ByteString.Char8 as B
|
||||
import qualified Data.ByteString.Lazy.Char8 as LB
|
||||
import Data.Int (Int64)
|
||||
import Data.Text (Text)
|
||||
import Simplex.FileTransfer.Types (FileHeader (..), authTagSize)
|
||||
import qualified Simplex.Messaging.Crypto as C
|
||||
import Simplex.Messaging.Crypto.File (CryptoFile (..), FTCryptoError (..))
|
||||
@@ -54,7 +59,7 @@ encryptFile srcFile fileHdr key nonce fileSize' encSize encFile = do
|
||||
liftIO $ B.hPut w ch'
|
||||
encryptChunks_ get w (sb', len - chSize)
|
||||
|
||||
decryptChunks :: Int64 -> [FilePath] -> C.SbKey -> C.CbNonce -> (String -> ExceptT String IO CryptoFile) -> ExceptT FTCryptoError IO CryptoFile
|
||||
decryptChunks :: Int64 -> [FilePath] -> C.SbKey -> C.CbNonce -> (Text -> ExceptT String IO CryptoFile) -> ExceptT FTCryptoError IO CryptoFile
|
||||
decryptChunks _ [] _ _ _ = throwE $ FTCEInvalidHeader "empty"
|
||||
decryptChunks encSize (chPath : chPaths) key nonce getDestFile = case reverse chPaths of
|
||||
[] -> do
|
||||
|
||||
@@ -14,7 +14,25 @@
|
||||
{-# LANGUAGE TypeFamilies #-}
|
||||
{-# OPTIONS_GHC -fno-warn-unticked-promoted-constructors #-}
|
||||
|
||||
module Simplex.FileTransfer.Protocol where
|
||||
module Simplex.FileTransfer.Protocol
|
||||
( FileParty (..),
|
||||
SFileParty (..),
|
||||
AFileParty (..),
|
||||
FilePartyI (..),
|
||||
FileCommand (..),
|
||||
FileCmd (..),
|
||||
FileInfo (..),
|
||||
XFTPFileId,
|
||||
FileResponse (..),
|
||||
xftpBlockSize,
|
||||
toFileParty,
|
||||
aFileParty,
|
||||
checkParty,
|
||||
xftpEncodeAuthTransmission,
|
||||
xftpEncodeTransmission,
|
||||
xftpDecodeTServer,
|
||||
xftpDecodeTClient,
|
||||
) where
|
||||
|
||||
import qualified Data.Aeson.TH as J
|
||||
import Data.Bifunctor (first)
|
||||
|
||||
@@ -13,7 +13,10 @@
|
||||
{-# LANGUAGE ScopedTypeVariables #-}
|
||||
{-# LANGUAGE TupleSections #-}
|
||||
|
||||
module Simplex.FileTransfer.Server where
|
||||
module Simplex.FileTransfer.Server
|
||||
( runXFTPServer,
|
||||
runXFTPServerBlocking,
|
||||
) where
|
||||
|
||||
import Control.Logger.Simple
|
||||
import Control.Monad
|
||||
@@ -40,10 +43,11 @@ import GHC.IO.Handle (hSetNewlineMode)
|
||||
import GHC.IORef (atomicSwapIORef)
|
||||
import GHC.Stats (getRTSStats)
|
||||
import qualified Network.HTTP.Types as N
|
||||
import Network.HPACK.Token (tokenKey)
|
||||
import qualified Network.HTTP2.Server as H
|
||||
import Network.Socket
|
||||
import Simplex.FileTransfer.Protocol
|
||||
import Simplex.FileTransfer.Server.Control
|
||||
import Simplex.FileTransfer.Server.Control (ControlProtocol (..))
|
||||
import Simplex.FileTransfer.Server.Env
|
||||
import Simplex.FileTransfer.Server.Prometheus
|
||||
import Simplex.FileTransfer.Server.Stats
|
||||
@@ -63,12 +67,12 @@ import Simplex.Messaging.Server.Stats
|
||||
import Simplex.Messaging.SystemTime
|
||||
import Simplex.Messaging.TMap (TMap)
|
||||
import qualified Simplex.Messaging.TMap as TM
|
||||
import Simplex.Messaging.Transport (CertChainPubKey (..), SessionId, THandleAuth (..), THandleParams (..), TransportPeer (..), defaultSupportedParams)
|
||||
import Simplex.Messaging.Transport (CertChainPubKey (..), SessionId, THandleAuth (..), THandleParams (..), TransportPeer (..), defaultSupportedParams, defaultSupportedParamsHTTPS)
|
||||
import Simplex.Messaging.Transport.Buffer (trimCR)
|
||||
import Simplex.Messaging.Transport.HTTP2
|
||||
import Simplex.Messaging.Transport.HTTP2.File (fileBlockSize)
|
||||
import Simplex.Messaging.Transport.HTTP2.Server
|
||||
import Simplex.Messaging.Transport.Server (runLocalTCPServer)
|
||||
import Simplex.Messaging.Transport.HTTP2.Server (runHTTP2Server)
|
||||
import Simplex.Messaging.Transport.Server (SNICredentialUsed, TransportServerConfig (..), runLocalTCPServer)
|
||||
import Simplex.Messaging.Util
|
||||
import Simplex.Messaging.Version
|
||||
import System.Environment (lookupEnv)
|
||||
@@ -89,9 +93,24 @@ data XFTPTransportRequest = XFTPTransportRequest
|
||||
{ thParams :: THandleParamsXFTP 'TServer,
|
||||
reqBody :: HTTP2Body,
|
||||
request :: H.Request,
|
||||
sendResponse :: H.Response -> IO ()
|
||||
sendResponse :: H.Response -> IO (),
|
||||
sniUsed :: SNICredentialUsed,
|
||||
addCORS :: Bool
|
||||
}
|
||||
|
||||
corsHeaders :: Bool -> [N.Header]
|
||||
corsHeaders addCORS
|
||||
| addCORS = [("Access-Control-Allow-Origin", "*"), ("Access-Control-Expose-Headers", "*")]
|
||||
| otherwise = []
|
||||
|
||||
corsPreflightHeaders :: [N.Header]
|
||||
corsPreflightHeaders =
|
||||
[ ("Access-Control-Allow-Origin", "*"),
|
||||
("Access-Control-Allow-Methods", "POST, OPTIONS"),
|
||||
("Access-Control-Allow-Headers", "*"),
|
||||
("Access-Control-Max-Age", "86400")
|
||||
]
|
||||
|
||||
runXFTPServer :: XFTPServerConfig -> IO ()
|
||||
runXFTPServer cfg = do
|
||||
started <- newEmptyTMVarIO
|
||||
@@ -120,45 +139,73 @@ xftpServer cfg@XFTPServerConfig {xftpPort, transportConfig, inactiveClientExpira
|
||||
runServer :: M ()
|
||||
runServer = do
|
||||
srvCreds@(chain, pk) <- asks tlsServerCreds
|
||||
httpCreds_ <- asks httpServerCreds
|
||||
signKey <- liftIO $ case C.x509ToPrivate' pk of
|
||||
Right pk' -> pure pk'
|
||||
Left e -> putStrLn ("Server has no valid key: " <> show e) >> exitFailure
|
||||
env <- ask
|
||||
sessions <- liftIO TM.emptyIO
|
||||
let cleanup sessionId = atomically $ TM.delete sessionId sessions
|
||||
liftIO . runHTTP2Server started xftpPort defaultHTTP2BufferSize defaultSupportedParams srvCreds transportConfig inactiveClientExpiration cleanup $ \sessionId sessionALPN r sendResponse -> do
|
||||
reqBody <- getHTTP2Body r xftpBlockSize
|
||||
let v = VersionXFTP 1
|
||||
thServerVRange = versionToRange v
|
||||
thParams0 = THandleParams {sessionId, blockSize = xftpBlockSize, thVersion = v, thServerVRange, thAuth = Nothing, implySessId = False, encryptBlock = Nothing, batch = True, serviceAuth = False}
|
||||
req0 = XFTPTransportRequest {thParams = thParams0, request = r, reqBody, sendResponse}
|
||||
flip runReaderT env $ case sessionALPN of
|
||||
Nothing -> processRequest req0
|
||||
Just alpn | alpn == xftpALPNv1 || alpn == httpALPN11 ->
|
||||
xftpServerHandshakeV1 chain signKey sessions req0 >>= \case
|
||||
Nothing -> pure () -- handshake response sent
|
||||
Just thParams -> processRequest req0 {thParams} -- proceed with new version (XXX: may as well switch the request handler here)
|
||||
_ -> liftIO . sendResponse $ H.responseNoBody N.ok200 [] -- shouldn't happen: means server picked handshake protocol it doesn't know about
|
||||
srvParams = if isJust httpCreds_ then defaultSupportedParamsHTTPS else defaultSupportedParams
|
||||
liftIO . runHTTP2Server started xftpPort defaultHTTP2BufferSize srvParams srvCreds httpCreds_ transportConfig inactiveClientExpiration cleanup $ \sniUsed sessionId sessionALPN r sendResponse -> do
|
||||
let addCORS' = sniUsed && addCORSHeaders transportConfig
|
||||
if addCORS' && H.requestMethod r == Just "OPTIONS"
|
||||
then sendResponse $ H.responseNoBody N.ok200 corsPreflightHeaders
|
||||
else do
|
||||
reqBody <- getHTTP2Body r xftpBlockSize
|
||||
let v = VersionXFTP 1
|
||||
thServerVRange = versionToRange v
|
||||
thParams0 = THandleParams {sessionId, blockSize = xftpBlockSize, thVersion = v, thServerVRange, thAuth = Nothing, implySessId = False, encryptBlock = Nothing, batch = True, serviceAuth = False}
|
||||
req0 = XFTPTransportRequest {thParams = thParams0, request = r, reqBody, sendResponse, sniUsed, addCORS = addCORS'}
|
||||
flip runReaderT env $ case sessionALPN of
|
||||
Nothing -> processRequest req0
|
||||
Just alpn
|
||||
| alpn == xftpALPNv1 || alpn == httpALPN11 || (sniUsed && alpn == "h2") ->
|
||||
xftpServerHandshakeV1 chain signKey sessions req0 >>= \case
|
||||
Nothing -> pure ()
|
||||
Just thParams -> processRequest req0 {thParams}
|
||||
| otherwise -> liftIO . sendResponse $ H.responseNoBody N.ok200 (corsHeaders addCORS')
|
||||
xftpServerHandshakeV1 :: X.CertificateChain -> C.APrivateSignKey -> TMap SessionId Handshake -> XFTPTransportRequest -> M (Maybe (THandleParams XFTPVersion 'TServer))
|
||||
xftpServerHandshakeV1 chain serverSignKey sessions XFTPTransportRequest {thParams = thParams0@THandleParams {sessionId}, reqBody = HTTP2Body {bodyHead}, sendResponse} = do
|
||||
xftpServerHandshakeV1 chain serverSignKey sessions XFTPTransportRequest {thParams = thParams0@THandleParams {sessionId}, request, reqBody = HTTP2Body {bodyHead}, sendResponse, sniUsed, addCORS} = do
|
||||
s <- atomically $ TM.lookup sessionId sessions
|
||||
r <- runExceptT $ case s of
|
||||
Nothing -> processHello
|
||||
Just (HandshakeSent pk) -> processClientHandshake pk
|
||||
Just (HandshakeAccepted thParams) -> pure $ Just thParams
|
||||
Nothing
|
||||
| sniUsed && not webHello -> throwE SESSION
|
||||
| otherwise -> processHello Nothing
|
||||
Just (HandshakeSent pk)
|
||||
| webHello -> processHello (Just pk)
|
||||
| otherwise -> processClientHandshake pk
|
||||
Just (HandshakeAccepted thParams)
|
||||
| webHello -> processHello (serverPrivKey <$> thAuth thParams)
|
||||
| webHandshake, Just auth <- thAuth thParams -> processClientHandshake (serverPrivKey auth)
|
||||
| otherwise -> pure $ Just thParams
|
||||
either sendError pure r
|
||||
where
|
||||
processHello = do
|
||||
unless (B.null bodyHead) $ throwE HANDSHAKE
|
||||
(k, pk) <- atomically . C.generateKeyPair =<< asks random
|
||||
atomically $ TM.insert sessionId (HandshakeSent pk) sessions
|
||||
webHello = sniUsed && any (\(t, _) -> tokenKey t == "xftp-web-hello") (fst $ H.requestHeaders request)
|
||||
webHandshake = sniUsed && any (\(t, _) -> tokenKey t == "xftp-handshake") (fst $ H.requestHeaders request)
|
||||
processHello pk_ = do
|
||||
challenge_ <-
|
||||
if
|
||||
| B.null bodyHead -> pure Nothing
|
||||
| sniUsed -> do
|
||||
body <- liftHS $ C.unPad bodyHead
|
||||
XFTPClientHello {webChallenge} <- liftHS $ first show (smpDecode body)
|
||||
pure webChallenge
|
||||
| otherwise -> throwE HANDSHAKE
|
||||
rng <- asks random
|
||||
k <- atomically $ TM.lookup sessionId sessions >>= \case
|
||||
Just (HandshakeSent pk') -> pure $ C.publicKey pk'
|
||||
_ -> do
|
||||
kp <- maybe (C.generateKeyPair rng) (\p -> pure (C.publicKey p, p)) pk_
|
||||
fst kp <$ TM.insert sessionId (HandshakeSent $ snd kp) sessions
|
||||
let authPubKey = CertChainPubKey chain (C.signX509 serverSignKey $ C.publicToX509 k)
|
||||
let hs = XFTPServerHandshake {xftpVersionRange = xftpServerVRange, sessionId, authPubKey}
|
||||
webIdentityProof = C.sign serverSignKey . (<> sessionId) <$> challenge_
|
||||
let hs = XFTPServerHandshake {xftpVersionRange = xftpServerVRange, sessionId, authPubKey, webIdentityProof}
|
||||
shs <- encodeXftp hs
|
||||
#ifdef slow_servers
|
||||
lift randomDelay
|
||||
#endif
|
||||
liftIO . sendResponse $ H.responseBuilder N.ok200 [] shs
|
||||
liftIO . sendResponse $ H.responseBuilder N.ok200 (corsHeaders addCORS) shs
|
||||
pure Nothing
|
||||
processClientHandshake pk = do
|
||||
unless (B.length bodyHead == xftpBlockSize) $ throwE HANDSHAKE
|
||||
@@ -174,13 +221,13 @@ xftpServer cfg@XFTPServerConfig {xftpPort, transportConfig, inactiveClientExpira
|
||||
#ifdef slow_servers
|
||||
lift randomDelay
|
||||
#endif
|
||||
liftIO . sendResponse $ H.responseNoBody N.ok200 []
|
||||
liftIO . sendResponse $ H.responseNoBody N.ok200 (corsHeaders addCORS)
|
||||
pure Nothing
|
||||
Nothing -> throwE HANDSHAKE
|
||||
sendError :: XFTPErrorType -> M (Maybe (THandleParams XFTPVersion 'TServer))
|
||||
sendError err = do
|
||||
runExceptT (encodeXftp err) >>= \case
|
||||
Right bs -> liftIO . sendResponse $ H.responseBuilder N.ok200 [] bs
|
||||
Right bs -> liftIO . sendResponse $ H.responseBuilder N.ok200 (corsHeaders addCORS) bs
|
||||
Left _ -> logError $ "Error encoding handshake error: " <> tshow err
|
||||
pure Nothing
|
||||
encodeXftp :: Encoding a => a -> ExceptT XFTPErrorType (ReaderT XFTPEnv IO) Builder
|
||||
@@ -330,6 +377,7 @@ xftpServer cfg@XFTPServerConfig {xftpPort, transportConfig, inactiveClientExpira
|
||||
CPHelp -> hPutStrLn h "commands: stats-rts, delete, help, quit"
|
||||
CPQuit -> pure ()
|
||||
CPSkip -> pure ()
|
||||
_ -> hPutStrLn h "unsupported command"
|
||||
where
|
||||
withUserRole action =
|
||||
readTVarIO role >>= \case
|
||||
@@ -346,7 +394,7 @@ data ServerFile = ServerFile
|
||||
}
|
||||
|
||||
processRequest :: XFTPTransportRequest -> M ()
|
||||
processRequest XFTPTransportRequest {thParams, reqBody = body@HTTP2Body {bodyHead}, sendResponse}
|
||||
processRequest XFTPTransportRequest {thParams, reqBody = body@HTTP2Body {bodyHead}, sendResponse, addCORS}
|
||||
| B.length bodyHead /= xftpBlockSize = sendXFTPResponse ("", NoEntity, FRErr BLOCK) Nothing
|
||||
| otherwise =
|
||||
case xftpDecodeTServer thParams bodyHead of
|
||||
@@ -365,7 +413,7 @@ processRequest XFTPTransportRequest {thParams, reqBody = body@HTTP2Body {bodyHea
|
||||
#ifdef slow_servers
|
||||
randomDelay
|
||||
#endif
|
||||
liftIO $ sendResponse $ H.responseStreaming N.ok200 [] $ streamBody t_
|
||||
liftIO $ sendResponse $ H.responseStreaming N.ok200 (corsHeaders addCORS) $ streamBody t_
|
||||
where
|
||||
streamBody t_ send done = do
|
||||
case t_ of
|
||||
|
||||
@@ -1,7 +1,10 @@
|
||||
{-# LANGUAGE LambdaCase #-}
|
||||
{-# LANGUAGE OverloadedStrings #-}
|
||||
|
||||
module Simplex.FileTransfer.Server.Control where
|
||||
module Simplex.FileTransfer.Server.Control
|
||||
( ControlProtocol (..),
|
||||
)
|
||||
where
|
||||
|
||||
import qualified Data.Attoparsec.ByteString.Char8 as A
|
||||
import Simplex.FileTransfer.Protocol (XFTPFileId)
|
||||
|
||||
@@ -7,7 +7,16 @@
|
||||
{-# LANGUAGE OverloadedStrings #-}
|
||||
{-# LANGUAGE StrictData #-}
|
||||
|
||||
module Simplex.FileTransfer.Server.Env where
|
||||
module Simplex.FileTransfer.Server.Env
|
||||
( XFTPServerConfig (..),
|
||||
XFTPEnv (..),
|
||||
XFTPRequest (..),
|
||||
defaultInactiveClientExpiration,
|
||||
defFileExpirationHours,
|
||||
defaultFileExpiration,
|
||||
newXFTPServerEnv,
|
||||
countUsedStorage,
|
||||
) where
|
||||
|
||||
import Control.Logger.Simple
|
||||
import Control.Monad
|
||||
@@ -57,6 +66,7 @@ data XFTPServerConfig = XFTPServerConfig
|
||||
-- | time after which inactive clients can be disconnected and check interval, seconds
|
||||
inactiveClientExpiration :: Maybe ExpirationConfig,
|
||||
xftpCredentials :: ServerCredentials,
|
||||
httpCredentials :: Maybe ServerCredentials,
|
||||
-- | XFTP client-server protocol version range
|
||||
xftpServerVRange :: VersionRangeXFTP,
|
||||
-- stats config - see SMP server config
|
||||
@@ -84,6 +94,7 @@ data XFTPEnv = XFTPEnv
|
||||
random :: TVar ChaChaDRG,
|
||||
serverIdentity :: C.KeyHash,
|
||||
tlsServerCreds :: T.Credential,
|
||||
httpServerCreds :: Maybe T.Credential,
|
||||
serverStats :: FileServerStats
|
||||
}
|
||||
|
||||
@@ -98,7 +109,7 @@ defaultFileExpiration =
|
||||
}
|
||||
|
||||
newXFTPServerEnv :: XFTPServerConfig -> IO XFTPEnv
|
||||
newXFTPServerEnv config@XFTPServerConfig {storeLogFile, fileSizeQuota, xftpCredentials} = do
|
||||
newXFTPServerEnv config@XFTPServerConfig {storeLogFile, fileSizeQuota, xftpCredentials, httpCredentials} = do
|
||||
random <- C.newRandom
|
||||
store <- newFileStore
|
||||
storeLog <- mapM (`readWriteFileStore` store) storeLogFile
|
||||
@@ -108,9 +119,10 @@ newXFTPServerEnv config@XFTPServerConfig {storeLogFile, fileSizeQuota, xftpCrede
|
||||
logNote $ "Total / available storage: " <> tshow quota <> " / " <> tshow (quota - used)
|
||||
when (quota < used) $ logWarn "WARNING: storage quota is less than used storage, no files can be uploaded!"
|
||||
tlsServerCreds <- loadServerCredential xftpCredentials
|
||||
httpServerCreds <- mapM loadServerCredential httpCredentials
|
||||
Fingerprint fp <- loadFingerprint xftpCredentials
|
||||
serverStats <- newFileServerStats =<< getCurrentTime
|
||||
pure XFTPEnv {config, store, storeLog, random, tlsServerCreds, serverIdentity = C.KeyHash fp, serverStats}
|
||||
pure XFTPEnv {config, store, storeLog, random, tlsServerCreds, httpServerCreds, serverIdentity = C.KeyHash fp, serverStats}
|
||||
|
||||
countUsedStorage :: M.Map k FileRec -> Int64
|
||||
countUsedStorage = M.foldl' (\acc FileRec {fileInfo = FileInfo {size}} -> acc + fromIntegral size) 0
|
||||
|
||||
@@ -6,13 +6,15 @@
|
||||
{-# LANGUAGE OverloadedStrings #-}
|
||||
{-# LANGUAGE PatternSynonyms #-}
|
||||
|
||||
module Simplex.FileTransfer.Server.Main where
|
||||
module Simplex.FileTransfer.Server.Main
|
||||
( xftpServerCLI,
|
||||
) where
|
||||
|
||||
import Data.Either (fromRight)
|
||||
import Data.Functor (($>))
|
||||
import Data.Ini (lookupValue, readIniFile)
|
||||
import Data.Int (Int64)
|
||||
import Data.Maybe (fromMaybe)
|
||||
import Data.Maybe (fromMaybe, isJust)
|
||||
import qualified Data.Text as T
|
||||
import qualified Data.Text.IO as T
|
||||
import Network.Socket (HostName)
|
||||
@@ -21,7 +23,7 @@ import Simplex.FileTransfer.Chunks
|
||||
import Simplex.FileTransfer.Description (FileSize (..))
|
||||
import Simplex.FileTransfer.Server (runXFTPServer)
|
||||
import Simplex.FileTransfer.Server.Env (XFTPServerConfig (..), defFileExpirationHours, defaultFileExpiration, defaultInactiveClientExpiration)
|
||||
import Simplex.FileTransfer.Transport (supportedFileServerVRange, alpnSupportedXFTPhandshakes)
|
||||
import Simplex.FileTransfer.Transport (alpnSupportedXFTPhandshakes, supportedFileServerVRange)
|
||||
import qualified Simplex.Messaging.Crypto as C
|
||||
import Simplex.Messaging.Encoding.String
|
||||
import Simplex.Messaging.Protocol (ProtoServerWithAuth (..), pattern XFTPServer)
|
||||
@@ -29,7 +31,7 @@ import Simplex.Messaging.Server.CLI
|
||||
import Simplex.Messaging.Server.Expiration
|
||||
import Simplex.Messaging.Transport.Client (TransportHost (..))
|
||||
import Simplex.Messaging.Transport.HTTP2 (httpALPN)
|
||||
import Simplex.Messaging.Transport.Server (ServerCredentials (..), mkTransportServerConfig)
|
||||
import Simplex.Messaging.Transport.Server (ServerCredentials (..), TransportServerConfig (..), mkTransportServerConfig)
|
||||
import Simplex.Messaging.Util (eitherToMaybe, safeDecodeUtf8, tshow)
|
||||
import System.Directory (createDirectoryIfMissing, doesFileExist)
|
||||
import System.FilePath (combine)
|
||||
@@ -124,6 +126,10 @@ xftpServerCLI cfgPath logPath = do
|
||||
\disconnect: off\n"
|
||||
<> ("# ttl: " <> tshow (ttl defaultInactiveClientExpiration) <> "\n")
|
||||
<> ("# check_interval: " <> tshow (checkInterval defaultInactiveClientExpiration) <> "\n")
|
||||
<> "\n\
|
||||
\[WEB]\n\
|
||||
\# cert: /etc/opt/simplex-xftp/web.crt\n\
|
||||
\# key: /etc/opt/simplex-xftp/web.key\n"
|
||||
runServer ini = do
|
||||
hSetBuffering stdout LineBuffering
|
||||
hSetBuffering stderr LineBuffering
|
||||
@@ -155,6 +161,17 @@ xftpServerCLI cfgPath logPath = do
|
||||
else "NOT allowed."
|
||||
putStrLn $ "Listening on port " <> xftpPort <> "..."
|
||||
|
||||
httpCredentials_ =
|
||||
eitherToMaybe $ do
|
||||
cert <- T.unpack <$> lookupValue "WEB" "cert" ini
|
||||
key <- T.unpack <$> lookupValue "WEB" "key" ini
|
||||
pure
|
||||
ServerCredentials
|
||||
{ caCertificateFile = Nothing,
|
||||
certificateFile = cert,
|
||||
privateKeyFile = key
|
||||
}
|
||||
|
||||
serverConfig =
|
||||
XFTPServerConfig
|
||||
{ xftpPort = T.unpack $ strictIni "TRANSPORT" "port" ini,
|
||||
@@ -186,6 +203,7 @@ xftpServerCLI cfgPath logPath = do
|
||||
privateKeyFile = c serverKeyFile,
|
||||
certificateFile = c serverCrtFile
|
||||
},
|
||||
httpCredentials = httpCredentials_,
|
||||
xftpServerVRange = supportedFileServerVRange,
|
||||
logStatsInterval = logStats $> 86400, -- seconds
|
||||
logStatsStartTime = 0, -- seconds from 00:00 UTC
|
||||
@@ -194,10 +212,12 @@ xftpServerCLI cfgPath logPath = do
|
||||
prometheusInterval = eitherToMaybe $ read . T.unpack <$> lookupValue "STORE_LOG" "prometheus_interval" ini,
|
||||
prometheusMetricsFile = combine logPath "xftp-server-metrics.txt",
|
||||
transportConfig =
|
||||
mkTransportServerConfig
|
||||
(fromMaybe False $ iniOnOff "TRANSPORT" "log_tls_errors" ini)
|
||||
(Just $ alpnSupportedXFTPhandshakes <> httpALPN)
|
||||
False,
|
||||
let cfg =
|
||||
mkTransportServerConfig
|
||||
(fromMaybe False $ iniOnOff "TRANSPORT" "log_tls_errors" ini)
|
||||
(Just $ alpnSupportedXFTPhandshakes <> httpALPN)
|
||||
False
|
||||
in cfg {addCORSHeaders = isJust httpCredentials_},
|
||||
responseDelay = 0
|
||||
}
|
||||
|
||||
@@ -229,11 +249,14 @@ cliCommandP cfgPath logPath iniFile =
|
||||
initP :: Parser InitOptions
|
||||
initP = do
|
||||
enableStoreLog <-
|
||||
flag' False
|
||||
flag'
|
||||
False
|
||||
( long "disable-store-log"
|
||||
<> help "Disable store log for persistence (enabled by default)"
|
||||
)
|
||||
<|> flag True True
|
||||
<|> flag
|
||||
True
|
||||
True
|
||||
( long "store-log"
|
||||
<> short 'l'
|
||||
<> help "Enable store log for persistence (DEPRECATED, enabled by default)"
|
||||
|
||||
@@ -4,7 +4,11 @@
|
||||
{-# LANGUAGE TypeApplications #-}
|
||||
{-# OPTIONS_GHC -fno-warn-unrecognised-pragmas #-}
|
||||
|
||||
module Simplex.FileTransfer.Server.Prometheus where
|
||||
module Simplex.FileTransfer.Server.Prometheus
|
||||
( FileServerMetrics (..),
|
||||
rtsOptionsEnv,
|
||||
xftpPrometheusMetrics,
|
||||
) where
|
||||
|
||||
import Data.Int (Int64)
|
||||
import Data.Text (Text)
|
||||
|
||||
@@ -2,7 +2,13 @@
|
||||
{-# LANGUAGE NamedFieldPuns #-}
|
||||
{-# LANGUAGE OverloadedStrings #-}
|
||||
|
||||
module Simplex.FileTransfer.Server.Stats where
|
||||
module Simplex.FileTransfer.Server.Stats
|
||||
( FileServerStats (..),
|
||||
FileServerStatsData (..),
|
||||
newFileServerStats,
|
||||
getFileServerStatsData,
|
||||
setFileServerStats,
|
||||
) where
|
||||
|
||||
import Control.Applicative ((<|>))
|
||||
import qualified Data.Attoparsec.ByteString.Char8 as A
|
||||
|
||||
@@ -19,6 +19,7 @@ module Simplex.FileTransfer.Transport
|
||||
-- xftpClientHandshake,
|
||||
XFTPServerHandshake (..),
|
||||
-- xftpServerHandshake,
|
||||
XFTPClientHello (..),
|
||||
THandleXFTP,
|
||||
THandleParamsXFTP,
|
||||
VersionXFTP,
|
||||
@@ -35,6 +36,7 @@ module Simplex.FileTransfer.Transport
|
||||
)
|
||||
where
|
||||
|
||||
import Control.Applicative (optional)
|
||||
import qualified Control.Exception as E
|
||||
import Control.Logger.Simple
|
||||
import Control.Monad
|
||||
@@ -60,7 +62,7 @@ import Simplex.Messaging.Parsers
|
||||
import Simplex.Messaging.Protocol (BlockingInfo, CommandError)
|
||||
import Simplex.Messaging.Transport (ALPN, CertChainPubKey, ServiceCredentials, SessionId, THandle (..), THandleParams (..), TransportError (..), TransportPeer (..))
|
||||
import Simplex.Messaging.Transport.HTTP2.File
|
||||
import Simplex.Messaging.Util (bshow, tshow)
|
||||
import Simplex.Messaging.Util (bshow, tshow, (<$?>))
|
||||
import Simplex.Messaging.Version
|
||||
import Simplex.Messaging.Version.Internal
|
||||
import System.IO (Handle, IOMode (..), withFile)
|
||||
@@ -111,11 +113,18 @@ alpnSupportedXFTPhandshakes = [xftpALPNv1]
|
||||
xftpALPNv1 :: ALPN
|
||||
xftpALPNv1 = "xftp/1"
|
||||
|
||||
data XFTPClientHello = XFTPClientHello
|
||||
{ -- | a random string sent by the client to the server to prove that server has identity certificate
|
||||
webChallenge :: Maybe ByteString
|
||||
}
|
||||
|
||||
data XFTPServerHandshake = XFTPServerHandshake
|
||||
{ xftpVersionRange :: VersionRangeXFTP,
|
||||
sessionId :: SessionId,
|
||||
-- | pub key to agree shared secrets for command authorization and entity ID encryption.
|
||||
authPubKey :: CertChainPubKey
|
||||
authPubKey :: CertChainPubKey,
|
||||
-- | signed identity challenge from XFTPClientHello
|
||||
webIdentityProof :: Maybe C.ASignature
|
||||
}
|
||||
|
||||
data XFTPClientHandshake = XFTPClientHandshake
|
||||
@@ -125,6 +134,14 @@ data XFTPClientHandshake = XFTPClientHandshake
|
||||
keyHash :: C.KeyHash
|
||||
}
|
||||
|
||||
instance Encoding XFTPClientHello where
|
||||
smpEncode XFTPClientHello {webChallenge} = smpEncode webChallenge
|
||||
smpP = do
|
||||
webChallenge <- smpP
|
||||
forM_ webChallenge $ \challenge -> unless (B.length challenge == 32) $ fail "bad XFTPClientHello webChallenge"
|
||||
Tail _compat <- smpP
|
||||
pure XFTPClientHello {webChallenge}
|
||||
|
||||
instance Encoding XFTPClientHandshake where
|
||||
smpEncode XFTPClientHandshake {xftpVersion, keyHash} =
|
||||
smpEncode (xftpVersion, keyHash)
|
||||
@@ -134,13 +151,13 @@ instance Encoding XFTPClientHandshake where
|
||||
pure XFTPClientHandshake {xftpVersion, keyHash}
|
||||
|
||||
instance Encoding XFTPServerHandshake where
|
||||
smpEncode XFTPServerHandshake {xftpVersionRange, sessionId, authPubKey} =
|
||||
smpEncode (xftpVersionRange, sessionId, authPubKey)
|
||||
smpEncode XFTPServerHandshake {xftpVersionRange, sessionId, authPubKey, webIdentityProof} =
|
||||
smpEncode (xftpVersionRange, sessionId, authPubKey, C.signatureBytes webIdentityProof)
|
||||
smpP = do
|
||||
(xftpVersionRange, sessionId) <- smpP
|
||||
authPubKey <- smpP
|
||||
(xftpVersionRange, sessionId, authPubKey) <- smpP
|
||||
webIdentityProof <- optional $ C.decodeSignature <$?> smpP
|
||||
Tail _compat <- smpP
|
||||
pure XFTPServerHandshake {xftpVersionRange, sessionId, authPubKey}
|
||||
pure XFTPServerHandshake {xftpVersionRange, sessionId, authPubKey, webIdentityProof}
|
||||
|
||||
sendEncFile :: Handle -> (Builder -> IO ()) -> LC.SbState -> Word32 -> IO ()
|
||||
sendEncFile h send = go
|
||||
|
||||
@@ -4,12 +4,36 @@
|
||||
{-# LANGUAGE OverloadedStrings #-}
|
||||
{-# LANGUAGE TemplateHaskell #-}
|
||||
|
||||
module Simplex.FileTransfer.Types where
|
||||
module Simplex.FileTransfer.Types
|
||||
( RcvFileId,
|
||||
SndFileId,
|
||||
FileHeader (..),
|
||||
DBRcvFileId,
|
||||
RcvFile (..),
|
||||
RcvFileStatus (..),
|
||||
RcvFileChunk (..),
|
||||
RcvFileChunkReplica (..),
|
||||
RcvFileRedirect (..),
|
||||
DBSndFileId,
|
||||
SndFile (..),
|
||||
SndFileStatus (..),
|
||||
SndFileChunk (..),
|
||||
NewSndChunkReplica (..),
|
||||
SndFileChunkReplica (..),
|
||||
SndFileReplicaStatus (..),
|
||||
DeletedSndChunkReplica (..),
|
||||
SentRecipientReplica (..),
|
||||
FileErrorType (..),
|
||||
authTagSize,
|
||||
sndFileEncPath,
|
||||
sndChunkSize,
|
||||
) where
|
||||
|
||||
import qualified Data.Aeson.TH as J
|
||||
import qualified Data.Attoparsec.ByteString.Char8 as A
|
||||
import Data.ByteString.Char8 (ByteString)
|
||||
import Data.Int (Int64)
|
||||
import Data.Text (Text)
|
||||
import qualified Data.Text as T
|
||||
import Data.Text.Encoding (encodeUtf8)
|
||||
import Data.Word (Word32)
|
||||
@@ -33,8 +57,8 @@ authTagSize = fromIntegral C.authTagSize
|
||||
|
||||
-- fileExtra is added to allow header extension in future versions
|
||||
data FileHeader = FileHeader
|
||||
{ fileName :: String,
|
||||
fileExtra :: Maybe String
|
||||
{ fileName :: Text,
|
||||
fileExtra :: Maybe Text
|
||||
}
|
||||
deriving (Eq, Show)
|
||||
|
||||
|
||||
+121
-34
@@ -60,6 +60,8 @@ module Simplex.Messaging.Agent
|
||||
deleteConnectionAsync,
|
||||
deleteConnectionsAsync,
|
||||
createConnection,
|
||||
prepareConnectionLink,
|
||||
createConnectionForLink,
|
||||
setConnShortLink,
|
||||
deleteConnShortLink,
|
||||
getConnShortLink,
|
||||
@@ -409,6 +411,19 @@ createConnection :: ConnectionModeI c => AgentClient -> NetworkRequestMode -> Us
|
||||
createConnection c nm userId enableNtfs checkNotices = withAgentEnv c .::. newConn c nm userId enableNtfs checkNotices
|
||||
{-# INLINE createConnection #-}
|
||||
|
||||
-- | Prepare connection link for contact mode (no network call).
|
||||
-- Returns root key pair (for signing OwnerAuth), the created link, and internal params.
|
||||
-- The link address is fully determined at this point.
|
||||
prepareConnectionLink :: AgentClient -> UserId -> Maybe ByteString -> Bool -> Maybe CRClientData -> AE (C.KeyPairEd25519, CreatedConnLink 'CMContact, PreparedLinkParams)
|
||||
prepareConnectionLink c userId linkEntityId checkNotices = withAgentEnv c . prepareConnectionLink' c userId linkEntityId checkNotices
|
||||
{-# INLINE prepareConnectionLink #-}
|
||||
|
||||
-- | Create connection for prepared link (single network call).
|
||||
-- Validates that server response matches the prepared link.
|
||||
createConnectionForLink :: AgentClient -> NetworkRequestMode -> UserId -> Bool -> CreatedConnLink 'CMContact -> PreparedLinkParams -> UserConnLinkData 'CMContact -> CR.InitialKeys -> SubscriptionMode -> AE ConnId
|
||||
createConnectionForLink c nm userId enableNtfs = withAgentEnv c .::. createConnectionForLink' c nm userId enableNtfs
|
||||
{-# INLINE createConnectionForLink #-}
|
||||
|
||||
-- | Create or update user's contact connection short link
|
||||
setConnShortLink :: AgentClient -> NetworkRequestMode -> ConnId -> SConnectionMode c -> UserConnLinkData c -> Maybe CRClientData -> AE (ConnShortLink c)
|
||||
setConnShortLink c = withAgentEnv c .::. setConnShortLink' c
|
||||
@@ -942,6 +957,66 @@ newConn c nm userId enableNtfs checkNotices cMode linkData_ clientData pqInitKey
|
||||
<$> newRcvConnSrv c nm userId connId enableNtfs cMode linkData_ clientData pqInitKeys subMode srv
|
||||
`catchE` \e -> withStore' c (`deleteConnRecord` connId) >> throwE e
|
||||
|
||||
-- | Prepare connection link for contact mode (no network, no database).
|
||||
-- Generates all cryptographic material and returns the link that will be created.
|
||||
prepareConnectionLink' :: AgentClient -> UserId -> Maybe ByteString -> Bool -> Maybe CRClientData -> AM (C.KeyPairEd25519, CreatedConnLink 'CMContact, PreparedLinkParams)
|
||||
prepareConnectionLink' c userId linkEntityId checkNotices clientData = do
|
||||
g <- asks random
|
||||
plpSrvWithAuth@(ProtoServerWithAuth srv _) <- getSMPServer c userId
|
||||
when checkNotices $ checkClientNotices c plpSrvWithAuth
|
||||
AgentConfig {smpClientVRange, smpAgentVRange} <- asks config
|
||||
plpNonce@(C.CbNonce corrId) <- atomically $ C.randomCbNonce g
|
||||
sigKeys@(_, plpRootPrivKey) <- atomically $ C.generateKeyPair g
|
||||
plpQueueE2EKeys@(e2ePubKey, _) <- atomically $ C.generateKeyPair g
|
||||
let sndId = SMP.EntityId $ B.take 24 $ C.sha3_384 corrId
|
||||
qUri = SMPQueueUri smpClientVRange $ SMPQueueAddress srv sndId e2ePubKey (Just QMContact)
|
||||
connReq = CRContactUri $ ConnReqUriData SSSimplex smpAgentVRange [qUri] clientData
|
||||
(plpLinkKey, plpSignedFixedData) = SL.encodeSignFixedData sigKeys smpAgentVRange connReq linkEntityId
|
||||
ccLink = CCLink connReq $ Just $ CSLContact SLSServer CCTContact srv plpLinkKey
|
||||
params = PreparedLinkParams {plpNonce, plpQueueE2EKeys, plpLinkKey, plpRootPrivKey, plpSignedFixedData, plpSrvWithAuth}
|
||||
pure (sigKeys, ccLink, params)
|
||||
|
||||
-- | Create connection for prepared link (single network call).
|
||||
createConnectionForLink' :: AgentClient -> NetworkRequestMode -> UserId -> Bool -> CreatedConnLink 'CMContact -> PreparedLinkParams -> UserConnLinkData 'CMContact -> CR.InitialKeys -> SubscriptionMode -> AM ConnId
|
||||
createConnectionForLink' c nm userId enableNtfs (CCLink connReq _) PreparedLinkParams {plpNonce, plpQueueE2EKeys, plpLinkKey, plpRootPrivKey, plpSignedFixedData, plpSrvWithAuth} userLinkData pqInitKeys subMode = do
|
||||
g <- asks random
|
||||
AgentConfig {smpAgentVRange} <- asks config
|
||||
case pqInitKeys of
|
||||
CR.IKUsePQ -> throwE $ CMD PROHIBITED "createConnectionForLink"
|
||||
_ -> pure ()
|
||||
connId <- newConnNoQueues c userId enableNtfs SCMContact (CR.connPQEncryption pqInitKeys)
|
||||
let CRContactUri ConnReqUriData {crSmpQueues = SMPQueueUri _ SMPQueueAddress {senderId = sndId} :| _} = connReq
|
||||
md = SL.encodeSignUserData SCMContact plpRootPrivKey smpAgentVRange userLinkData
|
||||
linkData = (plpSignedFixedData, md)
|
||||
qd <- encryptContactLinkData g plpRootPrivKey plpLinkKey sndId linkData
|
||||
(_, qUri) <-
|
||||
createRcvQueue c nm userId connId plpSrvWithAuth enableNtfs subMode (Just plpNonce) qd plpQueueE2EKeys
|
||||
`catchE` \e -> withStore' c (`deleteConnRecord` connId) >> throwE e
|
||||
let SMPQueueUri _ SMPQueueAddress {senderId = actualSndId} = qUri
|
||||
unless (actualSndId == sndId) $ throwE $ INTERNAL "createConnectionForLink: sender ID mismatch"
|
||||
pure connId
|
||||
|
||||
-- | Encrypt signed link data for contact mode.
|
||||
encryptContactLinkData :: TVar ChaChaDRG -> C.PrivateKeyEd25519 -> LinkKey -> SMP.SenderId -> (ByteString, ByteString) -> AM ClntQueueReqData
|
||||
encryptContactLinkData g privSigKey linkKey sndId linkData = do
|
||||
let (linkId, k) = SL.contactShortLinkKdf linkKey
|
||||
srvData <- liftError id $ SL.encryptLinkData g k linkData
|
||||
pure $ CQRContact $ Just CQRData {linkKey, privSigKey, srvReq = (linkId, (sndId, srvData))}
|
||||
|
||||
-- | Shared helper: create receive queue and set up subscriptions.
|
||||
createRcvQueue :: AgentClient -> NetworkRequestMode -> UserId -> ConnId -> SMPServerWithAuth -> Bool -> SubscriptionMode -> Maybe C.CbNonce -> ClntQueueReqData -> C.KeyPairX25519 -> AM (RcvQueue, SMPQueueUri)
|
||||
createRcvQueue c nm userId connId srvWithAuth@(ProtoServerWithAuth srv _) enableNtfs subMode nonce_ qd e2eKeys = do
|
||||
AgentConfig {smpClientVRange = vr} <- asks config
|
||||
ntfServer_ <- if enableNtfs then newQueueNtfServer else pure Nothing
|
||||
(rq, qUri, tSess, sessId, serviceId_) <-
|
||||
newRcvQueue_ c nm userId connId srvWithAuth vr qd (isJust ntfServer_) subMode nonce_ e2eKeys
|
||||
`catchAllErrors` \e -> liftIO (print e) >> throwE e
|
||||
atomically $ incSMPServerStat c userId srv connCreated
|
||||
rq' <- withStore c $ \db -> updateNewConnRcv db connId rq subMode
|
||||
lift . when (subMode == SMSubscribe) $ addNewQueueSubscription c rq' tSess sessId serviceId_
|
||||
mapM_ (newQueueNtfSubscription c rq') ntfServer_
|
||||
pure (rq', qUri)
|
||||
|
||||
checkClientNotices :: AgentClient -> SMPServerWithAuth -> AM ()
|
||||
checkClientNotices AgentClient {clientNotices, presetServers} (ProtoServerWithAuth srv@(ProtocolServer {host}) _) = do
|
||||
notices <- readTVarIO clientNotices
|
||||
@@ -1018,7 +1093,7 @@ setConnShortLink' c nm connId cMode userLinkData clientData =
|
||||
sigKeys@(_, privSigKey) <- atomically $ C.generateKeyPair @'C.Ed25519 g
|
||||
let qUri = SMPQueueUri vr $ (rcvSMPQueueAddress rq) {queueMode = Just QMContact}
|
||||
connReq = CRContactUri $ ConnReqUriData SSSimplex smpAgentVRange [qUri] clientData
|
||||
(linkKey, linkData) = SL.encodeSignLinkData sigKeys smpAgentVRange connReq ud
|
||||
(linkKey, linkData) = SL.encodeSignLinkData sigKeys smpAgentVRange connReq Nothing ud
|
||||
(linkId, k) = SL.contactShortLinkKdf linkKey
|
||||
srvData <- liftError id $ SL.encryptLinkData g k linkData
|
||||
let slCreds = ShortLinkCreds linkId linkKey privSigKey Nothing (fst srvData)
|
||||
@@ -1105,25 +1180,15 @@ newRcvConnSrv c nm userId connId enableNtfs cMode userLinkData_ clientData pqIni
|
||||
case userLinkData_ of
|
||||
Just d -> do
|
||||
(nonce, qUri, cReq, qd) <- prepareLinkData d $ fst e2eKeys
|
||||
(rq, qUri') <- createRcvQueue (Just nonce) qd e2eKeys
|
||||
(rq, qUri') <- createRcvQueue c nm userId connId srvWithAuth enableNtfs subMode (Just nonce) qd e2eKeys
|
||||
ccLink <- connReqWithShortLink qUri cReq qUri' (shortLink rq)
|
||||
pure ccLink
|
||||
Nothing -> do
|
||||
let qd = case cMode of SCMContact -> CQRContact Nothing; SCMInvitation -> CQRMessaging Nothing
|
||||
(_rq, qUri) <- createRcvQueue Nothing qd e2eKeys
|
||||
(_rq, qUri) <- createRcvQueue c nm userId connId srvWithAuth enableNtfs subMode Nothing qd e2eKeys
|
||||
cReq <- createConnReq qUri
|
||||
pure $ CCLink cReq Nothing
|
||||
where
|
||||
createRcvQueue :: Maybe C.CbNonce -> ClntQueueReqData -> C.KeyPairX25519 -> AM (RcvQueue, SMPQueueUri)
|
||||
createRcvQueue nonce_ qd e2eKeys = do
|
||||
AgentConfig {smpClientVRange = vr} <- asks config
|
||||
ntfServer_ <- if enableNtfs then newQueueNtfServer else pure Nothing
|
||||
(rq, qUri, tSess, sessId, serviceId_) <- newRcvQueue_ c nm userId connId srvWithAuth vr qd (isJust ntfServer_) subMode nonce_ e2eKeys `catchAllErrors` \e -> liftIO (print e) >> throwE e
|
||||
atomically $ incSMPServerStat c userId srv connCreated
|
||||
rq' <- withStore c $ \db -> updateNewConnRcv db connId rq subMode
|
||||
lift . when (subMode == SMSubscribe) $ addNewQueueSubscription c rq' tSess sessId serviceId_
|
||||
mapM_ (newQueueNtfSubscription c rq') ntfServer_
|
||||
pure (rq', qUri)
|
||||
createConnReq :: SMPQueueUri -> AM (ConnectionRequestUri c)
|
||||
createConnReq qUri = do
|
||||
AgentConfig {smpAgentVRange, e2eEncryptVRange} <- asks config
|
||||
@@ -1147,12 +1212,9 @@ newRcvConnSrv c nm userId connId enableNtfs cMode userLinkData_ clientData pqIni
|
||||
qm = case cMode of SCMContact -> QMContact; SCMInvitation -> QMMessaging
|
||||
qUri = SMPQueueUri vr $ SMPQueueAddress srv sndId e2eDhKey (Just qm)
|
||||
connReq <- createConnReq qUri
|
||||
let (linkKey, linkData) = SL.encodeSignLinkData sigKeys smpAgentVRange connReq userLinkData
|
||||
let (linkKey, linkData) = SL.encodeSignLinkData sigKeys smpAgentVRange connReq Nothing userLinkData
|
||||
qd <- case cMode of
|
||||
SCMContact -> do
|
||||
let (linkId, k) = SL.contactShortLinkKdf linkKey
|
||||
srvData <- liftError id $ SL.encryptLinkData g k linkData
|
||||
pure $ CQRContact $ Just CQRData {linkKey, privSigKey, srvReq = (linkId, (sndId, srvData))}
|
||||
SCMContact -> encryptContactLinkData g privSigKey linkKey sndId linkData
|
||||
SCMInvitation -> do
|
||||
let k = SL.invShortLinkKdf linkKey
|
||||
srvData <- liftError id $ SL.encryptLinkData g k linkData
|
||||
@@ -1831,8 +1893,13 @@ runCommandProcessing c@AgentClient {subQ} connId server_ Worker {doWork} = do
|
||||
_ -> throwE $ CMD PROHIBITED "SWCH: not duplex"
|
||||
DEL -> withServer' . tryCommand $ deleteConnection' c NRMBackground connId >> notify OK
|
||||
AInternalCommand cmd -> case cmd of
|
||||
ICAckDel rId srvMsgId msgId -> withServer $ \srv -> tryWithLock "ICAckDel" $ ack srv rId srvMsgId >> withStore' c (\db -> deleteMsg db connId msgId)
|
||||
ICAck rId srvMsgId -> withServer $ \srv -> tryWithLock "ICAck" $ ack srv rId srvMsgId
|
||||
ICAckDel rId srvMsgId msgId -> withServer $ \srv ->
|
||||
tryCommand $ withConnLockNotify c connId "ICAckDel" $ do
|
||||
t_ <- ack srv rId srvMsgId
|
||||
withStore' c (\db -> deleteMsg db connId msgId)
|
||||
pure t_
|
||||
ICAck rId srvMsgId -> withServer $ \srv ->
|
||||
tryCommand $ withConnLockNotify c connId "ICAck" $ ack srv rId srvMsgId
|
||||
ICAllowSecure _rId senderKey -> withServer' . tryMoveableWithLock "ICAllowSecure" $ do
|
||||
(SomeConn _ conn, AcceptedConfirmation {senderConf, ownConnInfo}) <-
|
||||
withStore c $ \db -> runExceptT $ (,) <$> ExceptT (getConn db connId) <*> ExceptT (getAcceptedConfirmation db connId)
|
||||
@@ -2195,7 +2262,7 @@ runSmpQueueMsgDelivery c@AgentClient {subQ} sq@SndQueue {userId, connId, server,
|
||||
cStats <- connectionStats c conn
|
||||
notify $ SWITCH QDSnd SPConfirmed cStats
|
||||
AM_QUSE_ -> pure ()
|
||||
AM_QTEST_ -> withConnLock c connId "runSmpQueueMsgDelivery AM_QTEST_" $ do
|
||||
AM_QTEST_ -> withConnLockNotify c connId "runSmpQueueMsgDelivery AM_QTEST_" $ do
|
||||
withStore' c $ \db -> setSndQueueStatus db sq Active
|
||||
SomeConn _ conn <- withStore c (`getConn` connId)
|
||||
case conn of
|
||||
@@ -2219,7 +2286,7 @@ runSmpQueueMsgDelivery c@AgentClient {subQ} sq@SndQueue {userId, connId, server,
|
||||
let sqs'' = sq'' :| sqs'
|
||||
conn' = DuplexConnection cData' rqs sqs''
|
||||
cStats <- connectionStats c conn'
|
||||
notify $ SWITCH QDSnd SPCompleted cStats
|
||||
pure $ Just ("", connId, AEvt SAEConn $ SWITCH QDSnd SPCompleted cStats)
|
||||
_ -> internalErr msgId "sent QTEST: there is only one queue in connection"
|
||||
_ -> internalErr msgId "sent QTEST: queue not in connection or not replacing another queue"
|
||||
_ -> internalErr msgId "QTEST sent not in duplex connection"
|
||||
@@ -2251,7 +2318,9 @@ runSmpQueueMsgDelivery c@AgentClient {subQ} sq@SndQueue {userId, connId, server,
|
||||
notifyDel msgId cmd = notify cmd >> delMsg msgId
|
||||
connError msgId = notifyDel msgId . ERR . (`CONN` "")
|
||||
qError msgId = notifyDel msgId . ERR . AGENT . A_QUEUE
|
||||
internalErr msgId = notifyDel msgId . ERR . INTERNAL
|
||||
internalErr msgId s = do
|
||||
delMsg msgId
|
||||
pure $ Just ("", connId, AEvt SAEConn $ ERR $ INTERNAL s)
|
||||
|
||||
retrySndOp :: AgentClient -> AM () -> AM ()
|
||||
retrySndOp c loop = do
|
||||
@@ -2261,17 +2330,31 @@ retrySndOp c loop = do
|
||||
atomically $ beginAgentOperation c AOSndNetwork
|
||||
loop
|
||||
|
||||
-- | Like 'withConnLock', but writes the returned 'ATransmission' to 'subQ'
|
||||
-- after releasing the lock, preventing deadlock with agentSubscriber.
|
||||
withConnLockNotify :: AgentClient -> ConnId -> Text -> AM (Maybe ATransmission) -> AM ()
|
||||
withConnLockNotify c connId name action = do
|
||||
t_ <- withConnLock c connId name action
|
||||
forM_ t_ $ atomically . writeTBQueue (subQ c)
|
||||
|
||||
ackMessage' :: AgentClient -> ConnId -> AgentMsgId -> Maybe MsgReceiptInfo -> AM ()
|
||||
ackMessage' c connId msgId rcptInfo_ = withConnLock c connId "ackMessage" $ do
|
||||
ackMessage' c connId msgId rcptInfo_ = withConnLockNotify c connId "ackMessage" $ do
|
||||
SomeConn _ conn <- withStore c (`getConn` connId)
|
||||
case conn of
|
||||
DuplexConnection {} -> ack >> sendRcpt conn >> del
|
||||
RcvConnection {} -> ack >> del
|
||||
DuplexConnection {} -> do
|
||||
t_ <- ack
|
||||
sendRcpt conn
|
||||
del
|
||||
pure t_
|
||||
RcvConnection {} -> do
|
||||
t_ <- ack
|
||||
del
|
||||
pure t_
|
||||
SndConnection {} -> throwE $ CONN SIMPLEX "ackMessage"
|
||||
ContactConnection {} -> throwE $ CMD PROHIBITED "ackMessage: ContactConnection"
|
||||
NewConnection _ -> throwE $ CMD PROHIBITED "ackMessage: NewConnection"
|
||||
where
|
||||
ack :: AM ()
|
||||
ack :: AM (Maybe ATransmission)
|
||||
ack = do
|
||||
-- the stored message was delivered via a specific queue, the rest failed to decrypt and were already acknowledged
|
||||
(rq, srvMsgId) <- withStore c $ \db -> setMsgUserAck db connId $ InternalId msgId
|
||||
@@ -2379,7 +2462,7 @@ synchronizeRatchet' c connId pqSupport' force = withConnLock c connId "synchroni
|
||||
| otherwise -> throwE $ CMD PROHIBITED "synchronizeRatchet: not allowed"
|
||||
_ -> throwE $ CMD PROHIBITED "synchronizeRatchet: not duplex"
|
||||
|
||||
ackQueueMessage :: AgentClient -> RcvQueue -> SMP.MsgId -> AM ()
|
||||
ackQueueMessage :: AgentClient -> RcvQueue -> SMP.MsgId -> AM (Maybe ATransmission)
|
||||
ackQueueMessage c rq@RcvQueue {userId, connId, server} srvMsgId = do
|
||||
atomically $ incSMPServerStat c userId server ackAttempts
|
||||
tryAllErrors (sendAck c rq srvMsgId) >>= \case
|
||||
@@ -2391,10 +2474,11 @@ ackQueueMessage c rq@RcvQueue {userId, connId, server} srvMsgId = do
|
||||
where
|
||||
sendMsgNtf stat = do
|
||||
atomically $ incSMPServerStat c userId server stat
|
||||
whenM (liftIO $ hasGetLock c rq) $ do
|
||||
atomically $ releaseGetLock c rq
|
||||
brokerTs_ <- eitherToMaybe <$> tryAllErrors (withStore c $ \db -> getRcvMsgBrokerTs db connId srvMsgId)
|
||||
atomically $ writeTBQueue (subQ c) ("", connId, AEvt SAEConn $ MSGNTF srvMsgId brokerTs_)
|
||||
ifM (liftIO $ hasGetLock c rq)
|
||||
(do atomically $ releaseGetLock c rq
|
||||
brokerTs_ <- eitherToMaybe <$> tryAllErrors (withStore c $ \db -> getRcvMsgBrokerTs db connId srvMsgId)
|
||||
pure $ Just ("", connId, AEvt SAEConn $ MSGNTF srvMsgId brokerTs_))
|
||||
(pure Nothing)
|
||||
|
||||
-- | Suspend SMP agent connection (OFF command) in Reader monad
|
||||
suspendConnection' :: AgentClient -> NetworkRequestMode -> ConnId -> AM ()
|
||||
@@ -2881,10 +2965,13 @@ getNextSMPServer c userId = getNextServer c userId storageSrvs
|
||||
{-# INLINE getNextSMPServer #-}
|
||||
|
||||
subscriber :: AgentClient -> AM' ()
|
||||
subscriber c@AgentClient {msgQ} = forever $ do
|
||||
subscriber c@AgentClient {msgQ, subQ} = run $ forever $ do
|
||||
t <- atomically $ readTBQueue msgQ
|
||||
agentOperationBracket c AORcvNetwork waitUntilActive $
|
||||
processSMPTransmissions c t
|
||||
where
|
||||
run a = a `catchOwn` \e -> notify $ CRITICAL True $ "Agent subscriber stopped: " <> show e
|
||||
notify err = atomically $ writeTBQueue subQ ("", "", AEvt SAEConn $ ERR err)
|
||||
|
||||
cleanupManager :: AgentClient -> AM' ()
|
||||
cleanupManager c@AgentClient {subQ} = do
|
||||
@@ -3214,7 +3301,7 @@ processSMPTransmissions c@AgentClient {subQ} (tSess@(userId, srv, _), THandlePar
|
||||
ackDel :: InternalId -> AM ACKd
|
||||
ackDel aId = enqueueCmd (ICAckDel rId srvMsgId aId) $> ACKd
|
||||
handleNotifyAck :: AM ACKd -> AM ACKd
|
||||
handleNotifyAck m = m `catchAllErrors` \e -> notify (ERR e) >> ack
|
||||
handleNotifyAck m = m `catchAllOwnErrors` \e -> notify (ERR e) >> ack
|
||||
SMP.END ->
|
||||
atomically (ifM (activeClientSession c tSess sessId) (removeSubscription c tSess connId rq $> True) (pure False))
|
||||
>>= notifyEnd
|
||||
|
||||
@@ -129,6 +129,7 @@ module Simplex.Messaging.Agent.Protocol
|
||||
ContactConnType (..),
|
||||
ShortLinkScheme (..),
|
||||
LinkKey (..),
|
||||
PreparedLinkParams (..),
|
||||
validateOwners,
|
||||
validateLinkOwners,
|
||||
sameConnReqContact,
|
||||
@@ -179,7 +180,7 @@ module Simplex.Messaging.Agent.Protocol
|
||||
where
|
||||
|
||||
import Control.Applicative (optional, (<|>))
|
||||
import Control.Exception (BlockedIndefinitelyOnSTM (..), fromException)
|
||||
import Control.Exception (BlockedIndefinitelyOnMVar (..), BlockedIndefinitelyOnSTM (..), fromException)
|
||||
import Data.Aeson (FromJSON (..), ToJSON (..), Value (..), (.:), (.:?))
|
||||
import qualified Data.Aeson as J'
|
||||
import qualified Data.Aeson.Encoding as JE
|
||||
@@ -1489,6 +1490,23 @@ newtype LinkKey = LinkKey ByteString -- sha3-256(fixed_data)
|
||||
|
||||
instance ToField LinkKey where toField (LinkKey s) = toField $ Binary s
|
||||
|
||||
-- | Parameters for creating a connection with a prepared link.
|
||||
data PreparedLinkParams = PreparedLinkParams
|
||||
{ -- | Correlation ID / determines sender ID
|
||||
plpNonce :: C.CbNonce,
|
||||
-- | Queue E2EE DH key pair
|
||||
plpQueueE2EKeys :: C.KeyPairX25519,
|
||||
-- | For encrypting link data
|
||||
plpLinkKey :: LinkKey,
|
||||
-- | Root signing key (for signing link data)
|
||||
plpRootPrivKey :: C.PrivateKeyEd25519,
|
||||
-- | smpEncode of FixedLinkData (includes linkEntityId)
|
||||
plpSignedFixedData :: ByteString,
|
||||
-- | Server with basic auth (not stored in link)
|
||||
plpSrvWithAuth :: SMPServerWithAuth
|
||||
}
|
||||
deriving (Show)
|
||||
|
||||
instance ConnectionModeI c => ToField (ConnectionLink c) where toField = toField . Binary . strEncode
|
||||
|
||||
instance (Typeable c, ConnectionModeI c) => FromField (ConnectionLink c) where fromField = blobFieldDecoder strDecode
|
||||
@@ -1824,7 +1842,7 @@ instance ConnectionModeI c => Encoding (FixedLinkData c) where
|
||||
smpEncode (agentVRange, rootKey, linkConnReq) <> maybe "" smpEncode linkEntityId
|
||||
smpP = do
|
||||
(agentVRange, rootKey, linkConnReq) <- smpP
|
||||
linkEntityId <- (smpP <|> pure Nothing) <* A.takeByteString -- ignoring tail for forward compatibility with the future link data encoding
|
||||
linkEntityId <- optional smpP <* A.takeByteString -- ignoring tail for forward compatibility with the future link data encoding
|
||||
pure FixedLinkData {agentVRange, rootKey, linkConnReq, linkEntityId}
|
||||
|
||||
instance ConnectionModeI c => Encoding (ConnLinkData c) where
|
||||
@@ -1987,7 +2005,9 @@ data AgentErrorType
|
||||
instance AnyError AgentErrorType where
|
||||
fromSomeException e = case fromException e of
|
||||
Just BlockedIndefinitelyOnSTM -> CRITICAL True "Thread blocked indefinitely in STM transaction"
|
||||
_ -> INTERNAL $ show e
|
||||
_ -> case fromException e of
|
||||
Just BlockedIndefinitelyOnMVar -> CRITICAL True "Thread blocked indefinitely on MVar"
|
||||
_ -> INTERNAL $ show e
|
||||
{-# INLINE fromSomeException #-}
|
||||
|
||||
-- | SMP agent protocol command or response error.
|
||||
|
||||
@@ -1,4 +1,11 @@
|
||||
module Simplex.Messaging.Agent.QueryString where
|
||||
module Simplex.Messaging.Agent.QueryString
|
||||
( QueryStringParams (..),
|
||||
QSPEscaping (..),
|
||||
queryParam,
|
||||
queryParamParser,
|
||||
queryParam_,
|
||||
queryParamStr,
|
||||
) where
|
||||
|
||||
import Data.Attoparsec.ByteString.Char8 (Parser)
|
||||
import qualified Data.Attoparsec.ByteString.Char8 as A
|
||||
|
||||
@@ -3,7 +3,32 @@
|
||||
{-# LANGUAGE NamedFieldPuns #-}
|
||||
{-# LANGUAGE TemplateHaskell #-}
|
||||
|
||||
module Simplex.Messaging.Agent.Stats where
|
||||
module Simplex.Messaging.Agent.Stats
|
||||
( AgentSMPServerStats (..),
|
||||
AgentSMPServerStatsData (..),
|
||||
OptionalInt (..),
|
||||
AgentXFTPServerStats (..),
|
||||
AgentXFTPServerStatsData (..),
|
||||
AgentNtfServerStats (..),
|
||||
AgentNtfServerStatsData (..),
|
||||
AgentPersistedServerStats (..),
|
||||
OptionalMap (..),
|
||||
newAgentSMPServerStats,
|
||||
newAgentSMPServerStatsData,
|
||||
newAgentSMPServerStats',
|
||||
getAgentSMPServerStats,
|
||||
addSMPStatsData,
|
||||
newAgentXFTPServerStats,
|
||||
newAgentXFTPServerStatsData,
|
||||
newAgentXFTPServerStats',
|
||||
getAgentXFTPServerStats,
|
||||
addXFTPStatsData,
|
||||
newAgentNtfServerStats,
|
||||
newAgentNtfServerStatsData,
|
||||
newAgentNtfServerStats',
|
||||
getAgentNtfServerStats,
|
||||
addNtfStatsData,
|
||||
) where
|
||||
|
||||
import Data.Aeson (FromJSON (..), FromJSONKey, ToJSON (..))
|
||||
import qualified Data.Aeson.TH as J
|
||||
|
||||
@@ -14,7 +14,79 @@
|
||||
{-# LANGUAGE StandaloneDeriving #-}
|
||||
{-# OPTIONS_GHC -fno-warn-unticked-promoted-constructors #-}
|
||||
|
||||
module Simplex.Messaging.Agent.Store where
|
||||
module Simplex.Messaging.Agent.Store
|
||||
( RcvQueue,
|
||||
NewRcvQueue,
|
||||
StoredRcvQueue (..),
|
||||
RcvQueueSub (..),
|
||||
ClientNtfCreds (..),
|
||||
InvShortLink (..),
|
||||
SndQueue,
|
||||
NewSndQueue,
|
||||
StoredSndQueue (..),
|
||||
SMPQueueRec (..),
|
||||
SomeRcvQueue (..),
|
||||
ConnType (..),
|
||||
Connection' (..),
|
||||
Connection,
|
||||
SConnType (..),
|
||||
SomeConn' (..),
|
||||
SomeConn,
|
||||
SomeConnSub,
|
||||
ConnData (..),
|
||||
NoticeId,
|
||||
PendingCommand (..),
|
||||
AgentCmdType (..),
|
||||
AgentCommand (..),
|
||||
AgentCommandTag (..),
|
||||
InternalCommand (..),
|
||||
InternalCommandTag (..),
|
||||
NewConfirmation (..),
|
||||
AcceptedConfirmation (..),
|
||||
NewInvitation (..),
|
||||
Invitation (..),
|
||||
PrevExternalSndId,
|
||||
PrevRcvMsgHash,
|
||||
PrevSndMsgHash,
|
||||
RcvMsgData (..),
|
||||
RcvMsg (..),
|
||||
SndMsgData (..),
|
||||
SndMsgPrepData (..),
|
||||
SndMsg (..),
|
||||
PendingMsgData (..),
|
||||
PendingMsgPrepData (..),
|
||||
InternalRcvId (..),
|
||||
ExternalSndId,
|
||||
ExternalSndTs,
|
||||
BrokerId,
|
||||
BrokerTs,
|
||||
InternalSndId (..),
|
||||
MsgBase (..),
|
||||
InternalId (..),
|
||||
InternalTs,
|
||||
AsyncCmdId,
|
||||
StoreError (..),
|
||||
AnyStoreError (..),
|
||||
ServiceAssoc,
|
||||
createStore,
|
||||
rcvQueueSub,
|
||||
rcvSMPQueueAddress,
|
||||
canAbortRcvSwitch,
|
||||
findQ,
|
||||
removeQ,
|
||||
removeQP,
|
||||
sndAddress,
|
||||
findRQ,
|
||||
switchingRQ,
|
||||
updatedQs,
|
||||
toConnData,
|
||||
updateConnection,
|
||||
connType,
|
||||
ratchetSyncAllowed,
|
||||
ratchetSyncSendProhibited,
|
||||
agentCommandTag,
|
||||
internalCmdTag,
|
||||
) where
|
||||
|
||||
import Control.Exception (Exception (..))
|
||||
import qualified Data.Attoparsec.ByteString.Char8 as A
|
||||
|
||||
@@ -8,7 +8,12 @@
|
||||
{-# LANGUAGE StandaloneDeriving #-}
|
||||
{-# LANGUAGE TypeApplications #-}
|
||||
|
||||
module Simplex.Messaging.Agent.Store.Entity where
|
||||
module Simplex.Messaging.Agent.Store.Entity
|
||||
( DBStored (..),
|
||||
DBEntityId,
|
||||
DBEntityId' (..),
|
||||
)
|
||||
where
|
||||
|
||||
import Data.Aeson (FromJSON (..), ToJSON (..))
|
||||
import qualified Data.Aeson as J
|
||||
|
||||
@@ -1,4 +1,6 @@
|
||||
module Simplex.Messaging.Agent.Store.Postgres.Options where
|
||||
module Simplex.Messaging.Agent.Store.Postgres.Options
|
||||
( DBOpts (..),
|
||||
) where
|
||||
|
||||
import Data.ByteString (ByteString)
|
||||
import Numeric.Natural
|
||||
|
||||
@@ -1,4 +1,10 @@
|
||||
module Simplex.Messaging.Agent.Store.SQLite.Util where
|
||||
module Simplex.Messaging.Agent.Store.SQLite.Util
|
||||
( SQLiteFunc,
|
||||
SQLiteFuncFinal,
|
||||
createStaticFunction,
|
||||
createStaticAggregate,
|
||||
mkSQLiteFunc,
|
||||
) where
|
||||
|
||||
import Control.Exception (SomeException, catch, mask_)
|
||||
import Data.ByteString (ByteString)
|
||||
|
||||
@@ -1,7 +1,11 @@
|
||||
{-# LANGUAGE LambdaCase #-}
|
||||
{-# LANGUAGE OverloadedStrings #-}
|
||||
|
||||
module Simplex.Messaging.Compression where
|
||||
module Simplex.Messaging.Compression
|
||||
( Compressed,
|
||||
maxLengthPassthrough,
|
||||
compressionLevel,
|
||||
) where
|
||||
|
||||
import qualified Codec.Compression.Zstd as Z1
|
||||
import Data.ByteString (ByteString)
|
||||
@@ -36,10 +40,12 @@ compress1 bs
|
||||
| B.length bs <= maxLengthPassthrough = Passthrough bs
|
||||
| otherwise = Compressed . Large $ Z1.compress compressionLevel bs
|
||||
|
||||
decompress1 :: Compressed -> Either String ByteString
|
||||
decompress1 = \case
|
||||
decompress1 :: Int -> Compressed -> Either String ByteString
|
||||
decompress1 limit = \case
|
||||
Passthrough bs -> Right bs
|
||||
Compressed (Large bs) -> case Z1.decompress bs of
|
||||
Z1.Error e -> Left e
|
||||
Z1.Skip -> Right mempty
|
||||
Z1.Decompress bs' -> Right bs'
|
||||
Compressed (Large bs) -> case Z1.decompressedSize bs of
|
||||
Just sz | sz <= limit -> case Z1.decompress bs of
|
||||
Z1.Error e -> Left e
|
||||
Z1.Skip -> Right mempty
|
||||
Z1.Decompress bs' -> Right bs'
|
||||
_ -> Left $ "compressed size not specified or exceeds " <> show limit
|
||||
|
||||
@@ -2,7 +2,12 @@
|
||||
{-# LANGUAGE GADTs #-}
|
||||
{-# LANGUAGE LambdaCase #-}
|
||||
|
||||
module Simplex.Messaging.Crypto.SNTRUP761 where
|
||||
module Simplex.Messaging.Crypto.SNTRUP761
|
||||
( KEMHybridSecret (..),
|
||||
kcbDecrypt,
|
||||
kcbEncrypt,
|
||||
kemHybridSecret,
|
||||
) where
|
||||
|
||||
import Crypto.Hash (Digest, SHA3_256, hash)
|
||||
import Data.ByteArray (ScrubbedBytes)
|
||||
|
||||
@@ -1,7 +1,16 @@
|
||||
{-# LANGUAGE CPP #-}
|
||||
{-# LANGUAGE TypeApplications #-}
|
||||
|
||||
module Simplex.Messaging.Crypto.SNTRUP761.Bindings where
|
||||
module Simplex.Messaging.Crypto.SNTRUP761.Bindings
|
||||
( KEMPublicKey (..),
|
||||
KEMSecretKey,
|
||||
KEMCiphertext (..),
|
||||
KEMSharedKey (..),
|
||||
KEMKeyPair,
|
||||
sntrup761Keypair,
|
||||
sntrup761Enc,
|
||||
sntrup761Dec,
|
||||
) where
|
||||
|
||||
import Control.Concurrent.STM
|
||||
import Crypto.Random (ChaChaDRG)
|
||||
|
||||
@@ -13,7 +13,9 @@ module Simplex.Messaging.Crypto.ShortLink
|
||||
( contactShortLinkKdf,
|
||||
invShortLinkKdf,
|
||||
encodeSignLinkData,
|
||||
encodeSignFixedData,
|
||||
encodeSignUserData,
|
||||
newOwnerAuth,
|
||||
encryptLinkData,
|
||||
encryptUserData,
|
||||
decryptLinkData,
|
||||
@@ -50,11 +52,16 @@ contactShortLinkKdf (LinkKey k) =
|
||||
invShortLinkKdf :: LinkKey -> C.SbKey
|
||||
invShortLinkKdf (LinkKey k) = C.unsafeSbKey $ C.hkdf "" k "SimpleXInvLink" 32
|
||||
|
||||
encodeSignLinkData :: ConnectionModeI c => C.KeyPairEd25519 -> VersionRangeSMPA -> ConnectionRequestUri c -> UserConnLinkData c -> (LinkKey, (ByteString, ByteString))
|
||||
encodeSignLinkData (rootKey, pk) agentVRange linkConnReq userData =
|
||||
let fd = smpEncode FixedLinkData {agentVRange, rootKey, linkConnReq, linkEntityId = Nothing}
|
||||
md = smpEncode $ connLinkData agentVRange userData
|
||||
in (LinkKey (C.sha3_256 fd), (encodeSign pk fd, encodeSign pk md))
|
||||
encodeSignLinkData :: forall c. ConnectionModeI c => C.KeyPairEd25519 -> VersionRangeSMPA -> ConnectionRequestUri c -> Maybe ByteString -> UserConnLinkData c -> (LinkKey, (ByteString, ByteString))
|
||||
encodeSignLinkData keys@(_, pk) agentVRange linkConnReq linkEntityId userData =
|
||||
let (linkKey, fd) = encodeSignFixedData keys agentVRange linkConnReq linkEntityId
|
||||
md = encodeSignUserData (sConnectionMode @c) pk agentVRange userData
|
||||
in (linkKey, (fd, md))
|
||||
|
||||
encodeSignFixedData :: ConnectionModeI c => C.KeyPairEd25519 -> VersionRangeSMPA -> ConnectionRequestUri c -> Maybe ByteString -> (LinkKey, ByteString)
|
||||
encodeSignFixedData (rootKey, pk) agentVRange linkConnReq linkEntityId =
|
||||
let fd = smpEncode FixedLinkData {agentVRange, rootKey, linkConnReq, linkEntityId}
|
||||
in (LinkKey (C.sha3_256 fd), encodeSign pk fd)
|
||||
|
||||
encodeSignUserData :: ConnectionModeI c => SConnectionMode c -> C.PrivateKeyEd25519 -> VersionRangeSMPA -> UserConnLinkData c -> ByteString
|
||||
encodeSignUserData _ pk agentVRange userLinkData =
|
||||
@@ -68,6 +75,14 @@ connLinkData vr = \case
|
||||
encodeSign :: C.PrivateKeyEd25519 -> ByteString -> ByteString
|
||||
encodeSign pk s = smpEncode (C.sign' pk s) <> s
|
||||
|
||||
-- | Generate a new owner key pair and create OwnerAuth signed by the authorizing key.
|
||||
-- ownerId is application-specific (e.g., MemberId in chat).
|
||||
newOwnerAuth :: TVar ChaChaDRG -> OwnerId -> C.PrivateKeyEd25519 -> IO (C.PrivateKeyEd25519, OwnerAuth)
|
||||
newOwnerAuth g ownerId signingKey = do
|
||||
(ownerKey, ownerPrivKey) <- atomically $ C.generateKeyPair @'C.Ed25519 g
|
||||
let authOwnerSig = C.sign' signingKey $ ownerId <> C.encodePubKey ownerKey
|
||||
pure (ownerPrivKey, OwnerAuth {ownerId, ownerKey, authOwnerSig})
|
||||
|
||||
encryptLinkData :: TVar ChaChaDRG -> C.SbKey -> (ByteString, ByteString) -> ExceptT AgentErrorType IO QueueLinkData
|
||||
encryptLinkData g k = bimapM (encrypt fixedDataPaddedLength) (encrypt userDataPaddedLength)
|
||||
where
|
||||
|
||||
@@ -24,6 +24,8 @@ import Data.Bits (shiftL, shiftR, (.|.))
|
||||
import Data.ByteString.Char8 (ByteString)
|
||||
import qualified Data.ByteString.Char8 as B
|
||||
import Data.ByteString.Internal (c2w, w2c)
|
||||
import Data.Text (Text)
|
||||
import Data.Text.Encoding (decodeUtf8', encodeUtf8)
|
||||
import Data.Int (Int64)
|
||||
import qualified Data.List.NonEmpty as L
|
||||
import Data.Time.Clock.System (SystemTime (..))
|
||||
@@ -156,6 +158,12 @@ smpEncodeList xs = B.cons (lenEncode $ length xs) . B.concat $ map smpEncode xs
|
||||
smpListP :: Encoding a => Parser [a]
|
||||
smpListP = (`A.count` smpP) =<< lenP
|
||||
|
||||
instance Encoding Text where
|
||||
smpEncode = smpEncode . encodeUtf8
|
||||
{-# INLINE smpEncode #-}
|
||||
smpP = either (fail . show) pure . decodeUtf8' =<< smpP
|
||||
{-# INLINE smpP #-}
|
||||
|
||||
instance Encoding String where
|
||||
smpEncode = smpEncode . B.pack
|
||||
{-# INLINE smpEncode #-}
|
||||
|
||||
@@ -4,7 +4,25 @@
|
||||
{-# LANGUAGE PatternSynonyms #-}
|
||||
{-# LANGUAGE TypeApplications #-}
|
||||
|
||||
module Simplex.Messaging.Notifications.Client where
|
||||
module Simplex.Messaging.Notifications.Client
|
||||
( NtfClient,
|
||||
NtfClientError,
|
||||
defaultNTFClientConfig,
|
||||
ntfRegisterToken,
|
||||
ntfVerifyToken,
|
||||
ntfCheckToken,
|
||||
ntfReplaceToken,
|
||||
ntfDeleteToken,
|
||||
ntfSetCronInterval,
|
||||
ntfCreateSubscription,
|
||||
ntfCreateSubscriptions,
|
||||
ntfCheckSubscription,
|
||||
ntfCheckSubscriptions,
|
||||
ntfDeleteSubscription,
|
||||
sendNtfCommand,
|
||||
okNtfCommand,
|
||||
)
|
||||
where
|
||||
|
||||
import Control.Monad.Except
|
||||
import Control.Monad.Trans.Except
|
||||
|
||||
@@ -9,7 +9,36 @@
|
||||
{-# LANGUAGE TypeApplications #-}
|
||||
{-# LANGUAGE TypeFamilies #-}
|
||||
|
||||
module Simplex.Messaging.Notifications.Protocol where
|
||||
module Simplex.Messaging.Notifications.Protocol
|
||||
( NtfEntity (..),
|
||||
SNtfEntity (..),
|
||||
NtfEntityI (..),
|
||||
NtfCommandTag (..),
|
||||
NtfCmdTag (..),
|
||||
NtfRegCode (..),
|
||||
NewNtfEntity (..),
|
||||
ANewNtfEntity (..),
|
||||
NtfCommand (..),
|
||||
NtfCmd (..),
|
||||
NtfResponseTag (..),
|
||||
NtfResponse (..),
|
||||
SMPQueueNtf (..),
|
||||
PushProvider (..),
|
||||
DeviceToken (..),
|
||||
PNMessageData (..),
|
||||
NtfEntityId,
|
||||
NtfSubscriptionId,
|
||||
NtfTokenId,
|
||||
NtfSubStatus (..),
|
||||
NtfTknStatus (..),
|
||||
NTInvalidReason (..),
|
||||
encodePNMessages,
|
||||
pnMessagesP,
|
||||
subscribeNtfStatuses,
|
||||
allowTokenVerification,
|
||||
allowNtfSubCommands,
|
||||
checkEntity,
|
||||
) where
|
||||
|
||||
import Control.Applicative (optional, (<|>))
|
||||
import Data.Aeson (FromJSON (..), ToJSON (..), (.:), (.=))
|
||||
|
||||
@@ -15,7 +15,12 @@
|
||||
{-# LANGUAGE TupleSections #-}
|
||||
{-# OPTIONS_GHC -fno-warn-ambiguous-fields #-}
|
||||
|
||||
module Simplex.Messaging.Notifications.Server where
|
||||
module Simplex.Messaging.Notifications.Server
|
||||
( runNtfServer,
|
||||
runNtfServerBlocking,
|
||||
restoreServerLastNtfs,
|
||||
)
|
||||
where
|
||||
|
||||
import Control.Concurrent (threadDelay)
|
||||
import Control.Concurrent.Async (mapConcurrently)
|
||||
|
||||
@@ -1,7 +1,10 @@
|
||||
{-# LANGUAGE LambdaCase #-}
|
||||
{-# LANGUAGE OverloadedStrings #-}
|
||||
|
||||
module Simplex.Messaging.Notifications.Server.Control where
|
||||
module Simplex.Messaging.Notifications.Server.Control
|
||||
( ControlProtocol (..),
|
||||
)
|
||||
where
|
||||
|
||||
import qualified Data.Attoparsec.ByteString.Char8 as A
|
||||
import Simplex.Messaging.Encoding.String
|
||||
|
||||
@@ -7,7 +7,23 @@
|
||||
{-# LANGUAGE OverloadedLists #-}
|
||||
{-# LANGUAGE OverloadedStrings #-}
|
||||
|
||||
module Simplex.Messaging.Notifications.Server.Env where
|
||||
module Simplex.Messaging.Notifications.Server.Env
|
||||
( NtfServerConfig (..),
|
||||
NtfEnv (..),
|
||||
NtfSubscriber (..),
|
||||
SMPSubscriberVar,
|
||||
SMPSubscriber (..),
|
||||
NtfPushServer (..),
|
||||
NtfRequest (..),
|
||||
NtfServerClient (..),
|
||||
defaultInactiveClientExpiration,
|
||||
newNtfServerEnv,
|
||||
newNtfSubscriber,
|
||||
newNtfPushServer,
|
||||
newPushClient,
|
||||
getPushClient,
|
||||
newNtfServerClient,
|
||||
) where
|
||||
|
||||
import Control.Concurrent (ThreadId)
|
||||
import Control.Monad.Except
|
||||
|
||||
@@ -8,7 +8,10 @@
|
||||
{-# LANGUAGE PatternSynonyms #-}
|
||||
{-# OPTIONS_GHC -fno-warn-ambiguous-fields #-}
|
||||
|
||||
module Simplex.Messaging.Notifications.Server.Main where
|
||||
module Simplex.Messaging.Notifications.Server.Main
|
||||
( ntfServerCLI,
|
||||
)
|
||||
where
|
||||
|
||||
import Control.Logger.Simple (setLogLevel)
|
||||
import Control.Monad ((<$!>))
|
||||
|
||||
@@ -4,7 +4,14 @@
|
||||
{-# LANGUAGE TypeApplications #-}
|
||||
{-# OPTIONS_GHC -fno-warn-unrecognised-pragmas #-}
|
||||
|
||||
module Simplex.Messaging.Notifications.Server.Prometheus where
|
||||
module Simplex.Messaging.Notifications.Server.Prometheus
|
||||
( NtfServerMetrics (..),
|
||||
NtfRealTimeMetrics (..),
|
||||
NtfSMPWorkerMetrics (..),
|
||||
NtfSMPSubMetrics (..),
|
||||
rtsOptionsEnv,
|
||||
ntfPrometheusMetrics,
|
||||
) where
|
||||
|
||||
import Data.Int (Int64)
|
||||
import qualified Data.Map.Strict as M
|
||||
|
||||
@@ -8,7 +8,20 @@
|
||||
|
||||
{-# HLINT ignore "Use newtype instead of data" #-}
|
||||
|
||||
module Simplex.Messaging.Notifications.Server.Push.APNS where
|
||||
module Simplex.Messaging.Notifications.Server.Push.APNS
|
||||
( PushNotification (..),
|
||||
APNSNotification (..),
|
||||
APNSNotificationBody (..),
|
||||
APNSAlertBody (..),
|
||||
APNSPushClientConfig (..),
|
||||
PushProviderError (..),
|
||||
PushProviderClient,
|
||||
APNSErrorResponse (..),
|
||||
apnsProviderHost,
|
||||
defaultAPNSPushClientConfig,
|
||||
createAPNSPushClient,
|
||||
apnsPushProviderClient,
|
||||
) where
|
||||
|
||||
import Control.Exception (Exception)
|
||||
import Control.Logger.Simple
|
||||
|
||||
@@ -1,6 +1,11 @@
|
||||
{-# LANGUAGE OverloadedStrings #-}
|
||||
|
||||
module Simplex.Messaging.Notifications.Server.Push.APNS.Internal where
|
||||
module Simplex.Messaging.Notifications.Server.Push.APNS.Internal
|
||||
( hApnsTopic,
|
||||
hApnsPushType,
|
||||
hApnsPriority,
|
||||
apnsJSONOptions,
|
||||
) where
|
||||
|
||||
import qualified Data.Aeson as J
|
||||
import qualified Data.CaseInsensitive as CI
|
||||
|
||||
@@ -2,7 +2,19 @@
|
||||
{-# LANGUAGE NamedFieldPuns #-}
|
||||
{-# LANGUAGE OverloadedStrings #-}
|
||||
|
||||
module Simplex.Messaging.Notifications.Server.Stats where
|
||||
module Simplex.Messaging.Notifications.Server.Stats
|
||||
( NtfServerStats (..),
|
||||
NtfServerStatsData (..),
|
||||
StatsByServer,
|
||||
StatsByServerData (..),
|
||||
newNtfServerStats,
|
||||
getNtfServerStatsData,
|
||||
setNtfServerStats,
|
||||
getStatsByServer,
|
||||
setStatsByServer,
|
||||
incServerStat,
|
||||
)
|
||||
where
|
||||
|
||||
import Control.Applicative (optional, (<|>))
|
||||
import Control.Concurrent.STM
|
||||
|
||||
@@ -9,7 +9,26 @@
|
||||
{-# LANGUAGE OverloadedStrings #-}
|
||||
{-# LANGUAGE ScopedTypeVariables #-}
|
||||
|
||||
module Simplex.Messaging.Notifications.Server.Store where
|
||||
module Simplex.Messaging.Notifications.Server.Store
|
||||
( NtfSTMStore (..),
|
||||
NtfTknData (..),
|
||||
NtfSubData (..),
|
||||
TokenNtfMessageRecord (..),
|
||||
newNtfSTMStore,
|
||||
mkNtfTknData,
|
||||
ntfSubServer,
|
||||
stmGetNtfTokenIO,
|
||||
stmAddNtfToken,
|
||||
stmRemoveInactiveTokenRegistrations,
|
||||
stmRemoveTokenRegistration,
|
||||
stmDeleteNtfToken,
|
||||
stmGetNtfSubscriptionIO,
|
||||
stmAddNtfSubscription,
|
||||
stmDeleteNtfSubscription,
|
||||
stmStoreTokenLastNtf,
|
||||
stmSetNtfService,
|
||||
)
|
||||
where
|
||||
|
||||
import Control.Concurrent.STM
|
||||
import Control.Monad
|
||||
|
||||
@@ -2,7 +2,10 @@
|
||||
{-# LANGUAGE OverloadedStrings #-}
|
||||
{-# LANGUAGE QuasiQuotes #-}
|
||||
|
||||
module Simplex.Messaging.Notifications.Server.Store.Migrations where
|
||||
module Simplex.Messaging.Notifications.Server.Store.Migrations
|
||||
( ntfServerMigrations,
|
||||
)
|
||||
where
|
||||
|
||||
import Data.List (sortOn)
|
||||
import Data.Text (Text)
|
||||
|
||||
@@ -16,7 +16,43 @@
|
||||
{-# LANGUAGE TypeOperators #-}
|
||||
{-# OPTIONS_GHC -fno-warn-orphans -fno-warn-ambiguous-fields #-}
|
||||
|
||||
module Simplex.Messaging.Notifications.Server.Store.Postgres where
|
||||
module Simplex.Messaging.Notifications.Server.Store.Postgres
|
||||
( NtfPostgresStore (..),
|
||||
NtfEntityRec (..),
|
||||
mkNtfTknRec,
|
||||
newNtfDbStore,
|
||||
closeNtfDbStore,
|
||||
addNtfToken,
|
||||
replaceNtfToken,
|
||||
getNtfToken,
|
||||
findNtfTokenRegistration,
|
||||
deleteNtfToken,
|
||||
updateTknCronInterval,
|
||||
getUsedSMPServers,
|
||||
getNtfServiceCredentials,
|
||||
setNtfServiceCredentials,
|
||||
updateNtfServiceId,
|
||||
getServerNtfSubscriptions,
|
||||
findNtfSubscription,
|
||||
getNtfSubscription,
|
||||
mkNtfSubRec,
|
||||
updateTknStatus,
|
||||
setTknStatusConfirmed,
|
||||
setTokenActive,
|
||||
withPeriodicNtfTokens,
|
||||
updateTokenCronSentAt,
|
||||
addNtfSubscription,
|
||||
deleteNtfSubscription,
|
||||
updateSubStatus,
|
||||
updateSrvSubStatus,
|
||||
batchUpdateSrvSubStatus,
|
||||
batchUpdateSrvSubErrors,
|
||||
removeServiceAndAssociations,
|
||||
addTokenLastNtf,
|
||||
getEntityCounts,
|
||||
withDB',
|
||||
withClientDB,
|
||||
) where
|
||||
|
||||
import qualified Control.Exception as E
|
||||
import Control.Logger.Simple
|
||||
|
||||
@@ -4,7 +4,16 @@
|
||||
{-# LANGUAGE OverloadedStrings #-}
|
||||
{-# LANGUAGE ScopedTypeVariables #-}
|
||||
|
||||
module Simplex.Messaging.Notifications.Server.Store.Types where
|
||||
module Simplex.Messaging.Notifications.Server.Store.Types
|
||||
( NtfTknRec (..),
|
||||
NtfSubRec (..),
|
||||
ServerNtfSub,
|
||||
NtfAssociatedService,
|
||||
mkTknData,
|
||||
mkTknRec,
|
||||
mkSubData,
|
||||
mkSubRec,
|
||||
) where
|
||||
|
||||
import Control.Applicative (optional)
|
||||
import Control.Concurrent.STM
|
||||
|
||||
@@ -7,7 +7,18 @@
|
||||
{-# LANGUAGE ScopedTypeVariables #-}
|
||||
{-# LANGUAGE TupleSections #-}
|
||||
|
||||
module Simplex.Messaging.Notifications.Transport where
|
||||
module Simplex.Messaging.Notifications.Transport
|
||||
( NTFVersion,
|
||||
VersionRangeNTF,
|
||||
pattern VersionNTF,
|
||||
THandleNTF,
|
||||
invalidReasonNTFVersion,
|
||||
supportedClientNTFVRange,
|
||||
supportedServerNTFVRange,
|
||||
alpnSupportedNTFHandshakes,
|
||||
ntfServerHandshake,
|
||||
ntfClientHandshake,
|
||||
) where
|
||||
|
||||
import Control.Monad (forM)
|
||||
import Control.Monad.Except
|
||||
|
||||
@@ -4,7 +4,19 @@
|
||||
{-# LANGUAGE NamedFieldPuns #-}
|
||||
{-# LANGUAGE OverloadedStrings #-}
|
||||
|
||||
module Simplex.Messaging.Notifications.Types where
|
||||
module Simplex.Messaging.Notifications.Types
|
||||
( NtfTknAction (..),
|
||||
NtfToken (..),
|
||||
NtfSubAction (..),
|
||||
NtfActionTs,
|
||||
NtfSubNTFAction (..),
|
||||
NtfSubSMPAction (..),
|
||||
NtfAgentSubStatus (..),
|
||||
NtfSubscription (..),
|
||||
newNtfToken,
|
||||
isDeleteNtfSubAction,
|
||||
newNtfSubscription,
|
||||
) where
|
||||
|
||||
import qualified Data.Attoparsec.ByteString.Char8 as A
|
||||
import Data.Text.Encoding (decodeLatin1, encodeUtf8)
|
||||
|
||||
@@ -4,7 +4,20 @@
|
||||
{-# LANGUAGE OverloadedStrings #-}
|
||||
{-# LANGUAGE PatternSynonyms #-}
|
||||
|
||||
module Simplex.Messaging.Parsers where
|
||||
module Simplex.Messaging.Parsers
|
||||
( parse,
|
||||
parseAll,
|
||||
parseE,
|
||||
parseE',
|
||||
parseRead1,
|
||||
parseString,
|
||||
fstToLower,
|
||||
dropPrefix,
|
||||
enumJSON,
|
||||
sumTypeJSON,
|
||||
defaultJSON,
|
||||
textP,
|
||||
) where
|
||||
|
||||
import Control.Monad.Trans.Except
|
||||
import qualified Data.Aeson as J
|
||||
|
||||
@@ -219,6 +219,7 @@ module Simplex.Messaging.Protocol
|
||||
-- * exports for tests
|
||||
CommandTag (..),
|
||||
BrokerMsgTag (..),
|
||||
checkParty,
|
||||
)
|
||||
where
|
||||
|
||||
|
||||
@@ -3,7 +3,9 @@
|
||||
{-# LANGUAGE OverloadedStrings #-}
|
||||
{-# LANGUAGE TemplateHaskell #-}
|
||||
|
||||
module Simplex.Messaging.Protocol.Types where
|
||||
module Simplex.Messaging.Protocol.Types
|
||||
( ClientNotice (..),
|
||||
) where
|
||||
|
||||
import qualified Data.Aeson.TH as J
|
||||
import Data.Int (Int64)
|
||||
|
||||
@@ -11,7 +11,46 @@
|
||||
{-# LANGUAGE TupleSections #-}
|
||||
{-# LANGUAGE TypeApplications #-}
|
||||
|
||||
module Simplex.Messaging.Server.CLI where
|
||||
module Simplex.Messaging.Server.CLI
|
||||
( SignAlgorithm (..),
|
||||
X509Config (..),
|
||||
CertOptions (..),
|
||||
IniOptions (..),
|
||||
exitError,
|
||||
confirmOrExit,
|
||||
defaultX509Config,
|
||||
getCliCommand',
|
||||
simplexmqVersionCommit,
|
||||
simplexmqCommit,
|
||||
createServerX509,
|
||||
createServerX509_,
|
||||
certOptionsP,
|
||||
dbOptsP,
|
||||
startOptionsP,
|
||||
parseLogLevel,
|
||||
genOnline,
|
||||
warnCAPrivateKeyFile,
|
||||
mkIniOptions,
|
||||
strictIni,
|
||||
readStrictIni,
|
||||
readIniDefault,
|
||||
iniOnOff,
|
||||
strDecodeIni,
|
||||
withPrompt,
|
||||
onOffPrompt,
|
||||
onOff,
|
||||
settingIsOn,
|
||||
checkSavedFingerprint,
|
||||
iniTransports,
|
||||
iniDBOptions,
|
||||
printServerConfig,
|
||||
printServerTransports,
|
||||
printSMPServerConfig,
|
||||
deleteDirIfExists,
|
||||
printServiceInfo,
|
||||
clearDirIfExists,
|
||||
getEnvPath,
|
||||
) where
|
||||
|
||||
import Control.Logger.Simple (LogLevel (..))
|
||||
import Control.Monad
|
||||
|
||||
@@ -1,7 +1,10 @@
|
||||
{-# LANGUAGE LambdaCase #-}
|
||||
{-# LANGUAGE OverloadedStrings #-}
|
||||
|
||||
module Simplex.Messaging.Server.Control where
|
||||
module Simplex.Messaging.Server.Control
|
||||
( CPClientRole (..),
|
||||
ControlProtocol (..),
|
||||
) where
|
||||
|
||||
import qualified Data.Attoparsec.ByteString.Char8 as A
|
||||
import Simplex.Messaging.Encoding.String
|
||||
|
||||
@@ -1,6 +1,10 @@
|
||||
{-# LANGUAGE NamedFieldPuns #-}
|
||||
|
||||
module Simplex.Messaging.Server.Expiration where
|
||||
module Simplex.Messaging.Server.Expiration
|
||||
( ExpirationConfig (..),
|
||||
expireBeforeEpoch,
|
||||
showTTL,
|
||||
) where
|
||||
|
||||
import Control.Monad.IO.Class
|
||||
import Data.Int (Int64)
|
||||
|
||||
@@ -6,7 +6,19 @@
|
||||
{-# LANGUAGE StrictData #-}
|
||||
{-# LANGUAGE TemplateHaskell #-}
|
||||
|
||||
module Simplex.Messaging.Server.Information where
|
||||
module Simplex.Messaging.Server.Information
|
||||
( ServerInformation (..),
|
||||
ServerPublicConfig (..),
|
||||
ServerPublicInfo (..),
|
||||
ServerPersistenceMode (..),
|
||||
ServerConditions (..),
|
||||
HostingType (..),
|
||||
Entity (..),
|
||||
ServerContactAddress (..),
|
||||
PGPKey (..),
|
||||
emptyServerInfo,
|
||||
hasServerInfo,
|
||||
) where
|
||||
|
||||
import Data.Aeson (FromJSON (..), ToJSON (..))
|
||||
import qualified Data.Aeson.TH as J
|
||||
|
||||
@@ -15,7 +15,29 @@
|
||||
{-# LANGUAGE TypeApplications #-}
|
||||
{-# OPTIONS_GHC -fno-warn-ambiguous-fields #-}
|
||||
|
||||
module Simplex.Messaging.Server.Main where
|
||||
module Simplex.Messaging.Server.Main
|
||||
( EmbeddedWebParams (..),
|
||||
WebHttpsParams (..),
|
||||
CliCommand (..),
|
||||
StoreCmd (..),
|
||||
DatabaseTable (..),
|
||||
smpServerCLI,
|
||||
smpServerCLI_,
|
||||
#if defined(dbServerPostgres)
|
||||
importStoreLogToDatabase,
|
||||
importMessagesToDatabase,
|
||||
exportDatabaseToStoreLog,
|
||||
#endif
|
||||
newJournalMsgStore,
|
||||
storeMsgsJournalDir',
|
||||
getServerSourceCode,
|
||||
simplexmqSource,
|
||||
serverPublicInfo,
|
||||
validCountryValue,
|
||||
printSourceCode,
|
||||
cliCommandP,
|
||||
strParse,
|
||||
) where
|
||||
|
||||
import Control.Concurrent.STM
|
||||
import Control.Exception (finally)
|
||||
|
||||
@@ -1,7 +1,9 @@
|
||||
{-# LANGUAGE LambdaCase #-}
|
||||
{-# LANGUAGE TemplateHaskell #-}
|
||||
|
||||
module Simplex.Messaging.Server.Main.GitCommit where
|
||||
module Simplex.Messaging.Server.Main.GitCommit
|
||||
( gitCommit,
|
||||
) where
|
||||
|
||||
import Language.Haskell.TH
|
||||
import System.Process
|
||||
|
||||
@@ -2,7 +2,18 @@
|
||||
{-# LANGUAGE NamedFieldPuns #-}
|
||||
{-# LANGUAGE OverloadedStrings #-}
|
||||
|
||||
module Simplex.Messaging.Server.Main.Init where
|
||||
module Simplex.Messaging.Server.Main.Init
|
||||
( InitOptions (..),
|
||||
ServerPassword (..),
|
||||
defaultControlPort,
|
||||
defaultDBOpts,
|
||||
defaultDeletedTTL,
|
||||
iniFileContent,
|
||||
informationIniContent,
|
||||
iniDbOpts,
|
||||
optDisabled,
|
||||
optDisabled',
|
||||
) where
|
||||
|
||||
import Data.Int (Int64)
|
||||
import qualified Data.List.NonEmpty as L
|
||||
|
||||
@@ -1,7 +1,9 @@
|
||||
{-# LANGUAGE LambdaCase #-}
|
||||
{-# LANGUAGE OverloadedStrings #-}
|
||||
|
||||
module Simplex.Messaging.Server.MsgStore where
|
||||
module Simplex.Messaging.Server.MsgStore
|
||||
( MsgLogRecord (..),
|
||||
) where
|
||||
|
||||
import Simplex.Messaging.Encoding.String
|
||||
import Simplex.Messaging.Protocol (Message (..), RecipientId)
|
||||
|
||||
@@ -15,7 +15,25 @@
|
||||
|
||||
{-# HLINT ignore "Redundant multi-way if" #-}
|
||||
|
||||
module Simplex.Messaging.Server.MsgStore.Types where
|
||||
module Simplex.Messaging.Server.MsgStore.Types
|
||||
( MsgStoreClass (..),
|
||||
MSType (..),
|
||||
QSType (..),
|
||||
SMSType (..),
|
||||
SQSType (..),
|
||||
MessageStats (..),
|
||||
LoadedQueueCounts (..),
|
||||
newMessageStats,
|
||||
addQueue,
|
||||
getQueue,
|
||||
getQueueRec,
|
||||
getQueues,
|
||||
getQueueRecs,
|
||||
readQueueRec,
|
||||
withPeekMsgQueue,
|
||||
expireQueueMsgs,
|
||||
deleteExpireMsgs_,
|
||||
) where
|
||||
|
||||
import Control.Concurrent.STM
|
||||
import Control.Monad
|
||||
|
||||
@@ -3,7 +3,14 @@
|
||||
{-# LANGUAGE NamedFieldPuns #-}
|
||||
{-# LANGUAGE OverloadedStrings #-}
|
||||
|
||||
module Simplex.Messaging.Server.NtfStore where
|
||||
module Simplex.Messaging.Server.NtfStore
|
||||
( NtfStore (..),
|
||||
MsgNtf (..),
|
||||
NtfLogRecord (..),
|
||||
storeNtf,
|
||||
deleteNtfs,
|
||||
deleteExpiredNtfs,
|
||||
) where
|
||||
|
||||
import Control.Concurrent.STM
|
||||
import Control.Monad (foldM)
|
||||
|
||||
@@ -3,7 +3,13 @@
|
||||
{-# LANGUAGE TypeApplications #-}
|
||||
{-# OPTIONS_GHC -fno-warn-unrecognised-pragmas #-}
|
||||
|
||||
module Simplex.Messaging.Server.Prometheus where
|
||||
module Simplex.Messaging.Server.Prometheus
|
||||
( ServerMetrics (..),
|
||||
RealTimeMetrics (..),
|
||||
RTSubscriberMetrics (..),
|
||||
rtsOptionsEnv,
|
||||
prometheusMetrics,
|
||||
) where
|
||||
|
||||
import Data.Int (Int64)
|
||||
import qualified Data.IntMap.Strict as IM
|
||||
|
||||
@@ -7,7 +7,13 @@
|
||||
{-# LANGUAGE NamedFieldPuns #-}
|
||||
{-# LANGUAGE OverloadedStrings #-}
|
||||
|
||||
module Simplex.Messaging.Server.QueueStore where
|
||||
module Simplex.Messaging.Server.QueueStore
|
||||
( QueueRec (..),
|
||||
NtfCreds (..),
|
||||
ServiceRec (..),
|
||||
CertFingerprint,
|
||||
ServerEntityStatus (..),
|
||||
) where
|
||||
|
||||
import Control.Applicative (optional, (<|>))
|
||||
import qualified Data.ByteString.Char8 as B
|
||||
|
||||
@@ -1,4 +1,6 @@
|
||||
module Simplex.Messaging.Server.QueueStore.Postgres.Config where
|
||||
module Simplex.Messaging.Server.QueueStore.Postgres.Config
|
||||
( PostgresStoreCfg (..),
|
||||
) where
|
||||
|
||||
import Data.Int (Int64)
|
||||
import Simplex.Messaging.Agent.Store.Postgres.Options (DBOpts)
|
||||
|
||||
@@ -2,7 +2,10 @@
|
||||
{-# LANGUAGE OverloadedStrings #-}
|
||||
{-# LANGUAGE QuasiQuotes #-}
|
||||
|
||||
module Simplex.Messaging.Server.QueueStore.Postgres.Migrations where
|
||||
module Simplex.Messaging.Server.QueueStore.Postgres.Migrations
|
||||
( serverMigrations,
|
||||
)
|
||||
where
|
||||
|
||||
import Data.List (sortOn)
|
||||
import Data.Text (Text)
|
||||
|
||||
@@ -3,7 +3,14 @@
|
||||
{-# LANGUAGE OverloadedStrings #-}
|
||||
{-# LANGUAGE TemplateHaskell #-}
|
||||
|
||||
module Simplex.Messaging.Server.QueueStore.QueueInfo where
|
||||
module Simplex.Messaging.Server.QueueStore.QueueInfo
|
||||
( QueueInfo (..),
|
||||
QSub (..),
|
||||
QSubThread (..),
|
||||
MsgInfo (..),
|
||||
MsgType (..),
|
||||
QueueMode (..),
|
||||
) where
|
||||
|
||||
import qualified Data.Aeson as J
|
||||
import qualified Data.Aeson.TH as JQ
|
||||
|
||||
@@ -5,7 +5,12 @@
|
||||
{-# LANGUAGE TypeFamilies #-}
|
||||
{-# LANGUAGE TypeFamilyDependencies #-}
|
||||
|
||||
module Simplex.Messaging.Server.QueueStore.Types where
|
||||
module Simplex.Messaging.Server.QueueStore.Types
|
||||
( StoreQueueClass (..),
|
||||
QueueStoreClass (..),
|
||||
EntityCounts (..),
|
||||
withLoadedQueues,
|
||||
) where
|
||||
|
||||
import Control.Concurrent.STM
|
||||
import Control.Monad
|
||||
|
||||
@@ -6,7 +6,40 @@
|
||||
{-# LANGUAGE TupleSections #-}
|
||||
{-# LANGUAGE TypeApplications #-}
|
||||
|
||||
module Simplex.Messaging.Server.Stats where
|
||||
module Simplex.Messaging.Server.Stats
|
||||
( ServerStats (..),
|
||||
ServerStatsData (..),
|
||||
PeriodStats (..),
|
||||
PeriodStatsData (..),
|
||||
PeriodStatCounts (..),
|
||||
ProxyStats (..),
|
||||
ProxyStatsData (..),
|
||||
ServiceStats (..),
|
||||
ServiceStatsData (..),
|
||||
TimeBuckets (..),
|
||||
newServerStats,
|
||||
getServerStatsData,
|
||||
setServerStats,
|
||||
newPeriodStats,
|
||||
newPeriodStatsData,
|
||||
getPeriodStatsData,
|
||||
setPeriodStats,
|
||||
periodStatDataCounts,
|
||||
periodStatCounts,
|
||||
updatePeriodStats,
|
||||
newProxyStats,
|
||||
newProxyStatsData,
|
||||
getProxyStatsData,
|
||||
getResetProxyStatsData,
|
||||
setProxyStats,
|
||||
newServiceStatsData,
|
||||
newServiceStats,
|
||||
getServiceStatsData,
|
||||
getResetServiceStatsData,
|
||||
setServiceStats,
|
||||
emptyTimeBuckets,
|
||||
updateTimeBuckets,
|
||||
) where
|
||||
|
||||
import Control.Applicative (optional, (<|>))
|
||||
import qualified Data.Attoparsec.ByteString.Char8 as A
|
||||
|
||||
@@ -7,7 +7,10 @@
|
||||
{-# LANGUAGE ScopedTypeVariables #-}
|
||||
{-# LANGUAGE TypeApplications #-}
|
||||
|
||||
module Simplex.Messaging.Server.StoreLog.ReadWrite where
|
||||
module Simplex.Messaging.Server.StoreLog.ReadWrite
|
||||
( writeQueueStore,
|
||||
readQueueStore,
|
||||
) where
|
||||
|
||||
import Control.Concurrent.STM
|
||||
import Control.Logger.Simple
|
||||
|
||||
@@ -2,7 +2,9 @@
|
||||
{-# LANGUAGE GADTs #-}
|
||||
{-# LANGUAGE KindSignatures #-}
|
||||
|
||||
module Simplex.Messaging.Server.StoreLog.Types where
|
||||
module Simplex.Messaging.Server.StoreLog.Types
|
||||
( StoreLog (..),
|
||||
) where
|
||||
|
||||
import System.IO (Handle, IOMode (..))
|
||||
|
||||
|
||||
@@ -1,7 +1,11 @@
|
||||
{-# LANGUAGE LambdaCase #-}
|
||||
{-# LANGUAGE OverloadedStrings #-}
|
||||
|
||||
module Simplex.Messaging.ServiceScheme where
|
||||
module Simplex.Messaging.ServiceScheme
|
||||
( ServiceScheme (..),
|
||||
SrvLoc (..),
|
||||
simplexChat,
|
||||
) where
|
||||
|
||||
import Control.Applicative ((<|>))
|
||||
import qualified Data.Attoparsec.ByteString.Char8 as A
|
||||
|
||||
@@ -2,7 +2,12 @@
|
||||
{-# LANGUAGE NamedFieldPuns #-}
|
||||
{-# LANGUAGE ScopedTypeVariables #-}
|
||||
|
||||
module Simplex.Messaging.Session where
|
||||
module Simplex.Messaging.Session
|
||||
( SessionVar (..),
|
||||
getSessVar,
|
||||
removeSessVar,
|
||||
tryReadSessVar,
|
||||
) where
|
||||
|
||||
import Control.Concurrent.STM
|
||||
import Data.Time (UTCTime)
|
||||
|
||||
@@ -6,7 +6,15 @@
|
||||
{-# LANGUAGE ScopedTypeVariables #-}
|
||||
{-# LANGUAGE TypeApplications #-}
|
||||
|
||||
module Simplex.Messaging.SystemTime where
|
||||
module Simplex.Messaging.SystemTime
|
||||
( RoundedSystemTime (..),
|
||||
SystemDate,
|
||||
SystemSeconds,
|
||||
getRoundedSystemTime,
|
||||
getSystemDate,
|
||||
getSystemSeconds,
|
||||
roundedToUTCTime,
|
||||
) where
|
||||
|
||||
import Data.Aeson (FromJSON, ToJSON)
|
||||
import Data.Int (Int64)
|
||||
|
||||
@@ -2,7 +2,15 @@
|
||||
{-# LANGUAGE NamedFieldPuns #-}
|
||||
{-# LANGUAGE OverloadedStrings #-}
|
||||
|
||||
module Simplex.Messaging.Transport.Buffer where
|
||||
module Simplex.Messaging.Transport.Buffer
|
||||
( TBuffer (..),
|
||||
newTBuffer,
|
||||
peekBuffered,
|
||||
getBuffered,
|
||||
withTimedErr,
|
||||
getLnBuffered,
|
||||
trimCR,
|
||||
) where
|
||||
|
||||
import Control.Concurrent.STM
|
||||
import qualified Control.Exception as E
|
||||
|
||||
@@ -1,7 +1,15 @@
|
||||
{-# LANGUAGE NamedFieldPuns #-}
|
||||
{-# LANGUAGE OverloadedStrings #-}
|
||||
|
||||
module Simplex.Messaging.Transport.HTTP2 where
|
||||
module Simplex.Messaging.Transport.HTTP2
|
||||
( HTTP2Body (..),
|
||||
defaultHTTP2BufferSize,
|
||||
withHTTP2,
|
||||
http2TLSParams,
|
||||
getHTTP2Body,
|
||||
httpALPN,
|
||||
httpALPN11,
|
||||
) where
|
||||
|
||||
import qualified Control.Exception as E
|
||||
import Data.ByteString.Char8 (ByteString)
|
||||
|
||||
@@ -8,7 +8,19 @@
|
||||
{-# LANGUAGE ScopedTypeVariables #-}
|
||||
{-# LANGUAGE TypeApplications #-}
|
||||
|
||||
module Simplex.Messaging.Transport.HTTP2.Client where
|
||||
module Simplex.Messaging.Transport.HTTP2.Client
|
||||
( HTTP2Client (..),
|
||||
HClient (..),
|
||||
HTTP2Response (..),
|
||||
HTTP2ClientConfig (..),
|
||||
HTTP2ClientError (..),
|
||||
defaultHTTP2ClientConfig,
|
||||
getHTTP2Client,
|
||||
getVerifiedHTTP2Client,
|
||||
attachHTTP2Client,
|
||||
closeHTTP2Client,
|
||||
sendRequest,
|
||||
) where
|
||||
|
||||
import Control.Concurrent.Async
|
||||
import Control.Exception (Handler (..), IOException, SomeAsyncException, SomeException)
|
||||
|
||||
@@ -1,6 +1,11 @@
|
||||
{-# LANGUAGE MultiWayIf #-}
|
||||
|
||||
module Simplex.Messaging.Transport.HTTP2.File where
|
||||
module Simplex.Messaging.Transport.HTTP2.File
|
||||
( fileBlockSize,
|
||||
hReceiveFile,
|
||||
hSendFile,
|
||||
getFileChunk,
|
||||
) where
|
||||
|
||||
import Data.ByteString (ByteString)
|
||||
import qualified Data.ByteString as B
|
||||
|
||||
@@ -16,7 +16,7 @@ import Numeric.Natural (Natural)
|
||||
import Simplex.Messaging.Server.Expiration
|
||||
import Simplex.Messaging.Transport (ALPN, SessionId, TLS, closeConnection, tlsALPN, tlsUniq)
|
||||
import Simplex.Messaging.Transport.HTTP2
|
||||
import Simplex.Messaging.Transport.Server (ServerCredentials, TransportServerConfig (..), loadServerCredential, runTransportServer)
|
||||
import Simplex.Messaging.Transport.Server (SNICredentialUsed, ServerCredentials, TLSServerCredential (..), TransportServerConfig (..), loadServerCredential, newSocketState, runTransportServerState_)
|
||||
import Simplex.Messaging.Util (threadDelay')
|
||||
import UnliftIO (finally)
|
||||
import UnliftIO.Concurrent (forkIO, killThread)
|
||||
@@ -54,7 +54,7 @@ getHTTP2Server HTTP2ServerConfig {qSize, http2Port, bufferSize, bodyHeadSize, se
|
||||
started <- newEmptyTMVarIO
|
||||
reqQ <- newTBQueueIO qSize
|
||||
action <- async $
|
||||
runHTTP2Server started http2Port bufferSize serverSupported srvCreds transportConfig Nothing (const $ pure ()) $ \sessionId sessionALPN r sendResponse -> do
|
||||
runHTTP2Server started http2Port bufferSize serverSupported srvCreds Nothing transportConfig Nothing (const $ pure ()) $ \_sniUsed sessionId sessionALPN r sendResponse -> do
|
||||
reqBody <- getHTTP2Body r bodyHeadSize
|
||||
atomically $ writeTBQueue reqQ HTTP2Request {sessionId, sessionALPN, request = r, reqBody, sendResponse}
|
||||
void . atomically $ takeTMVar started
|
||||
@@ -63,24 +63,33 @@ getHTTP2Server HTTP2ServerConfig {qSize, http2Port, bufferSize, bodyHeadSize, se
|
||||
closeHTTP2Server :: HTTP2Server -> IO ()
|
||||
closeHTTP2Server = uninterruptibleCancel . action
|
||||
|
||||
runHTTP2Server :: TMVar Bool -> ServiceName -> BufferSize -> T.Supported -> T.Credential -> TransportServerConfig -> Maybe ExpirationConfig -> (SessionId -> IO ()) -> HTTP2ServerFunc -> IO ()
|
||||
runHTTP2Server started port bufferSize srvSupported srvCreds transportConfig expCfg_ clientFinished = runHTTP2ServerWith_ expCfg_ clientFinished bufferSize setup
|
||||
runHTTP2Server :: TMVar Bool -> ServiceName -> BufferSize -> T.Supported -> T.Credential -> Maybe T.Credential -> TransportServerConfig -> Maybe ExpirationConfig -> (SessionId -> IO ()) -> (SNICredentialUsed -> HTTP2ServerFunc) -> IO ()
|
||||
runHTTP2Server started port bufferSize srvSupported srvCreds httpCreds_ transportConfig expCfg_ clientFinished = runHTTP2ServerWith_ expCfg_ clientFinished bufferSize setup
|
||||
where
|
||||
setup = runTransportServer started port srvSupported srvCreds transportConfig
|
||||
setup handler = do
|
||||
ss <- newSocketState
|
||||
let combinedCreds = TLSServerCredential {credential = srvCreds, sniCredential = httpCreds_}
|
||||
runTransportServerState_ ss started port srvSupported combinedCreds transportConfig $ \_ -> handler
|
||||
|
||||
-- HTTP2 server can be run on both client and server TLS connections.
|
||||
runHTTP2ServerWith :: BufferSize -> ((TLS p -> IO ()) -> a) -> HTTP2ServerFunc -> a
|
||||
runHTTP2ServerWith = runHTTP2ServerWith_ Nothing (\_sessId -> pure ())
|
||||
runHTTP2ServerWith bufferSize tlsSetup http2Server =
|
||||
runHTTP2ServerWith_
|
||||
Nothing
|
||||
(\_sessId -> pure ())
|
||||
bufferSize
|
||||
(\handler -> tlsSetup $ \tls -> handler (False, tls))
|
||||
(const http2Server)
|
||||
|
||||
runHTTP2ServerWith_ :: Maybe ExpirationConfig -> (SessionId -> IO ()) -> BufferSize -> ((TLS p -> IO ()) -> a) -> HTTP2ServerFunc -> a
|
||||
runHTTP2ServerWith_ expCfg_ clientFinished bufferSize setup http2Server = setup $ \tls -> do
|
||||
runHTTP2ServerWith_ :: Maybe ExpirationConfig -> (SessionId -> IO ()) -> BufferSize -> (((SNICredentialUsed, TLS p) -> IO ()) -> a) -> (SNICredentialUsed -> HTTP2ServerFunc) -> a
|
||||
runHTTP2ServerWith_ expCfg_ clientFinished bufferSize setup http2Server = setup $ \(sniUsed, tls) -> do
|
||||
activeAt <- newTVarIO =<< getSystemTime
|
||||
tid_ <- mapM (forkIO . expireInactiveClient tls activeAt) expCfg_
|
||||
withHTTP2 bufferSize (run tls activeAt) (clientFinished $ tlsUniq tls) tls `finally` mapM_ killThread tid_
|
||||
withHTTP2 bufferSize (run sniUsed tls activeAt) (clientFinished $ tlsUniq tls) tls `finally` mapM_ killThread tid_
|
||||
where
|
||||
run tls activeAt cfg = H.run cfg $ \req _aux sendResp -> do
|
||||
run sniUsed tls activeAt cfg = H.run cfg $ \req _aux sendResp -> do
|
||||
getSystemTime >>= atomically . writeTVar activeAt
|
||||
http2Server (tlsUniq tls) (tlsALPN tls) req (`sendResp` [])
|
||||
http2Server sniUsed (tlsUniq tls) (tlsALPN tls) req (`sendResp` [])
|
||||
expireInactiveClient tls activeAt expCfg = loop
|
||||
where
|
||||
loop = do
|
||||
|
||||
@@ -4,7 +4,11 @@
|
||||
{-# LANGUAGE NamedFieldPuns #-}
|
||||
{-# LANGUAGE TemplateHaskell #-}
|
||||
|
||||
module Simplex.Messaging.Transport.KeepAlive where
|
||||
module Simplex.Messaging.Transport.KeepAlive
|
||||
( KeepAliveOpts (..),
|
||||
defaultKeepAliveOpts,
|
||||
setSocketKeepAlive,
|
||||
) where
|
||||
|
||||
import qualified Data.Aeson.TH as J
|
||||
import Foreign.C (CInt (..))
|
||||
|
||||
@@ -11,6 +11,7 @@ module Simplex.Messaging.Transport.Server
|
||||
( TransportServerConfig (..),
|
||||
ServerCredentials (..),
|
||||
TLSServerCredential (..),
|
||||
SNICredentialUsed,
|
||||
AddHTTP,
|
||||
mkTransportServerConfig,
|
||||
runTransportServerState,
|
||||
@@ -62,6 +63,7 @@ data TransportServerConfig = TransportServerConfig
|
||||
{ logTLSErrors :: Bool,
|
||||
serverALPN :: Maybe [ALPN],
|
||||
askClientCert :: Bool,
|
||||
addCORSHeaders :: Bool,
|
||||
tlsSetupTimeout :: Int,
|
||||
transportTimeout :: Int
|
||||
}
|
||||
@@ -91,6 +93,7 @@ mkTransportServerConfig logTLSErrors serverALPN askClientCert =
|
||||
{ logTLSErrors,
|
||||
serverALPN,
|
||||
askClientCert,
|
||||
addCORSHeaders = False,
|
||||
tlsSetupTimeout = 60000000,
|
||||
transportTimeout = 40000000
|
||||
}
|
||||
@@ -274,9 +277,10 @@ paramsAskClientCert clientCert params =
|
||||
{ T.serverWantClientCert = True,
|
||||
T.serverHooks =
|
||||
(T.serverHooks params)
|
||||
{ T.onClientCertificate = \cc -> validateClientCertificate cc >>= \case
|
||||
Just reason -> T.CertificateUsageReject reason <$ atomically (tryPutTMVar clientCert Nothing)
|
||||
Nothing -> T.CertificateUsageAccept <$ atomically (tryPutTMVar clientCert $ Just cc)
|
||||
{ T.onClientCertificate = \cc ->
|
||||
validateClientCertificate cc >>= \case
|
||||
Just reason -> T.CertificateUsageReject reason <$ atomically (tryPutTMVar clientCert Nothing)
|
||||
Nothing -> T.CertificateUsageAccept <$ atomically (tryPutTMVar clientCert $ Just cc)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -2,7 +2,12 @@
|
||||
{-# LANGUAGE NamedFieldPuns #-}
|
||||
{-# LANGUAGE OverloadedStrings #-}
|
||||
|
||||
module Simplex.Messaging.Transport.Shared where
|
||||
module Simplex.Messaging.Transport.Shared
|
||||
( ChainCertificates (..),
|
||||
chainIdCaCerts,
|
||||
x509validate,
|
||||
takePeerCertChain,
|
||||
) where
|
||||
|
||||
import Control.Concurrent.STM
|
||||
import qualified Control.Exception as E
|
||||
|
||||
+131
-19
@@ -3,8 +3,70 @@
|
||||
{-# LANGUAGE OverloadedStrings #-}
|
||||
{-# LANGUAGE ScopedTypeVariables #-}
|
||||
|
||||
module Simplex.Messaging.Util where
|
||||
module Simplex.Messaging.Util
|
||||
( AnyError (..),
|
||||
(<$?>),
|
||||
($>>),
|
||||
(<$$),
|
||||
(<$$>),
|
||||
raceAny_,
|
||||
bshow,
|
||||
tshow,
|
||||
maybeWord,
|
||||
liftError,
|
||||
liftError',
|
||||
liftEitherWith,
|
||||
ifM,
|
||||
whenM,
|
||||
unlessM,
|
||||
anyM,
|
||||
($>>=),
|
||||
mapME,
|
||||
bindRight,
|
||||
forME,
|
||||
mapAccumLM,
|
||||
packZipWith,
|
||||
tryWriteTBQueue,
|
||||
catchAll,
|
||||
catchAll_,
|
||||
tryAllErrors,
|
||||
tryAllErrors',
|
||||
catchAllErrors,
|
||||
catchAllErrors',
|
||||
catchThrow,
|
||||
allFinally,
|
||||
isOwnException,
|
||||
isAsyncCancellation,
|
||||
catchOwn',
|
||||
catchOwn,
|
||||
tryAllOwnErrors,
|
||||
tryAllOwnErrors',
|
||||
catchAllOwnErrors,
|
||||
catchAllOwnErrors',
|
||||
eitherToMaybe,
|
||||
listToEither,
|
||||
firstRow,
|
||||
maybeFirstRow,
|
||||
maybeFirstRow',
|
||||
firstRow',
|
||||
groupOn,
|
||||
groupOn',
|
||||
eqOn,
|
||||
groupAllOn,
|
||||
toChunks,
|
||||
safeDecodeUtf8,
|
||||
timeoutThrow,
|
||||
threadDelay',
|
||||
diffToMicroseconds,
|
||||
diffToMilliseconds,
|
||||
labelMyThread,
|
||||
atomicModifyIORef'_,
|
||||
encodeJSON,
|
||||
decodeJSON,
|
||||
traverseWithKey_,
|
||||
) where
|
||||
|
||||
import Control.Exception (AllocationLimitExceeded (..), AsyncException (..))
|
||||
import qualified Control.Exception as E
|
||||
import Control.Monad
|
||||
import Control.Monad.Except
|
||||
@@ -23,9 +85,9 @@ import Data.Int (Int64)
|
||||
import Data.List (groupBy, sortOn)
|
||||
import Data.List.NonEmpty (NonEmpty (..))
|
||||
import qualified Data.List.NonEmpty as L
|
||||
import Data.Maybe (listToMaybe)
|
||||
import Data.Map.Strict (Map)
|
||||
import qualified Data.Map.Strict as M
|
||||
import Data.Maybe (listToMaybe)
|
||||
import Data.Text (Text)
|
||||
import qualified Data.Text as T
|
||||
import Data.Text.Encoding (decodeUtf8With, encodeUtf8)
|
||||
@@ -98,7 +160,7 @@ anyM :: Monad m => [m Bool] -> m Bool
|
||||
anyM = foldM (\r a -> if r then pure r else (r ||) <$!> a) False
|
||||
{-# INLINE anyM #-}
|
||||
|
||||
infixl 1 $>>, $>>=
|
||||
infixl 1 $>>, $>>=
|
||||
|
||||
($>>=) :: (Monad m, Monad f, Traversable f) => m (f a) -> (a -> m (f b)) -> m (f b)
|
||||
f $>>= g = f >>= fmap join . mapM g
|
||||
@@ -120,15 +182,19 @@ forME :: (Monad m, Traversable t) => t (Either e a) -> (a -> m (Either e b)) ->
|
||||
forME = flip mapME
|
||||
{-# INLINE forME #-}
|
||||
|
||||
|
||||
-- | Monadic version of mapAccumL
|
||||
-- Copied from ghc-9.6.3 package: https://hackage.haskell.org/package/ghc-9.12.1/docs/GHC-Utils-Monad.html#v:mapAccumLM
|
||||
-- for backward compatibility with 8.10.7.
|
||||
mapAccumLM :: (Monad m, Traversable t)
|
||||
=> (acc -> x -> m (acc, y)) -- ^ combining function
|
||||
-> acc -- ^ initial state
|
||||
-> t x -- ^ inputs
|
||||
-> m (acc, t y) -- ^ final state, outputs
|
||||
mapAccumLM ::
|
||||
(Monad m, Traversable t) =>
|
||||
-- | combining function
|
||||
(acc -> x -> m (acc, y)) ->
|
||||
-- | initial state
|
||||
acc ->
|
||||
-- | inputs
|
||||
t x ->
|
||||
-- | final state, outputs
|
||||
m (acc, t y)
|
||||
{-# INLINE [1] mapAccumLM #-}
|
||||
-- INLINE pragma. mapAccumLM is called in inner loops. Like 'map',
|
||||
-- we inline it so that we can take advantage of knowing 'f'.
|
||||
@@ -137,26 +203,31 @@ mapAccumLM :: (Monad m, Traversable t)
|
||||
mapAccumLM f s = fmap swap . flip runStateT s . traverse f'
|
||||
where
|
||||
f' = StateT . (fmap . fmap) swap . flip f
|
||||
|
||||
{-# RULES "mapAccumLM/List" mapAccumLM = mapAccumLM_List #-}
|
||||
{-# RULES "mapAccumLM/NonEmpty" mapAccumLM = mapAccumLM_NonEmpty #-}
|
||||
|
||||
mapAccumLM_List
|
||||
:: Monad m
|
||||
=> (acc -> x -> m (acc, y))
|
||||
-> acc -> [x] -> m (acc, [y])
|
||||
mapAccumLM_List ::
|
||||
Monad m =>
|
||||
(acc -> x -> m (acc, y)) ->
|
||||
acc ->
|
||||
[x] ->
|
||||
m (acc, [y])
|
||||
{-# INLINE mapAccumLM_List #-}
|
||||
mapAccumLM_List f = go
|
||||
where
|
||||
go s (x : xs) = do
|
||||
(s1, x') <- f s x
|
||||
(s1, x') <- f s x
|
||||
(s2, xs') <- go s1 xs
|
||||
return (s2, x' : xs')
|
||||
return (s2, x' : xs')
|
||||
go s [] = return (s, [])
|
||||
|
||||
mapAccumLM_NonEmpty
|
||||
:: Monad m
|
||||
=> (acc -> x -> m (acc, y))
|
||||
-> acc -> NonEmpty x -> m (acc, NonEmpty y)
|
||||
mapAccumLM_NonEmpty ::
|
||||
Monad m =>
|
||||
(acc -> x -> m (acc, y)) ->
|
||||
acc ->
|
||||
NonEmpty x ->
|
||||
m (acc, NonEmpty y)
|
||||
{-# INLINE mapAccumLM_NonEmpty #-}
|
||||
mapAccumLM_NonEmpty f s (x :| xs) =
|
||||
[(s2, x' :| xs') | (s1, x') <- f s x, (s2, xs') <- mapAccumLM_List f s1 xs]
|
||||
@@ -223,6 +294,47 @@ allFinally :: (AnyError e, MonadUnliftIO m) => ExceptT e m a -> ExceptT e m b ->
|
||||
allFinally action final = tryAllErrors action >>= \r -> final >> except r
|
||||
{-# INLINE allFinally #-}
|
||||
|
||||
isOwnException :: E.SomeException -> Bool
|
||||
isOwnException e = case E.fromException e of
|
||||
Just StackOverflow -> True
|
||||
Just HeapOverflow -> True
|
||||
_ -> case E.fromException e of
|
||||
Just AllocationLimitExceeded -> True
|
||||
_ -> False
|
||||
{-# INLINE isOwnException #-}
|
||||
|
||||
isAsyncCancellation :: E.SomeException -> Bool
|
||||
isAsyncCancellation e = case E.fromException e of
|
||||
Just (_ :: SomeAsyncException) -> not $ isOwnException e
|
||||
Nothing -> False
|
||||
{-# INLINE isAsyncCancellation #-}
|
||||
|
||||
catchOwn' :: IO a -> (E.SomeException -> IO a) -> IO a
|
||||
catchOwn' action handleInternal = action `E.catch` \e -> if isAsyncCancellation e then E.throwIO e else handleInternal e
|
||||
{-# INLINE catchOwn' #-}
|
||||
|
||||
catchOwn :: MonadUnliftIO m => m a -> (E.SomeException -> m a) -> m a
|
||||
catchOwn action handleInternal =
|
||||
withRunInIO $ \run ->
|
||||
run action `E.catch` \e -> if isAsyncCancellation e then E.throwIO e else run (handleInternal e)
|
||||
{-# INLINE catchOwn #-}
|
||||
|
||||
tryAllOwnErrors :: (AnyError e, MonadUnliftIO m) => ExceptT e m a -> ExceptT e m (Either e a)
|
||||
tryAllOwnErrors action = ExceptT $ Right <$> runExceptT action `catchOwn` (pure . Left . fromSomeException)
|
||||
{-# INLINE tryAllOwnErrors #-}
|
||||
|
||||
tryAllOwnErrors' :: (AnyError e, MonadUnliftIO m) => ExceptT e m a -> m (Either e a)
|
||||
tryAllOwnErrors' action = runExceptT action `catchOwn` (pure . Left . fromSomeException)
|
||||
{-# INLINE tryAllOwnErrors' #-}
|
||||
|
||||
catchAllOwnErrors :: (AnyError e, MonadUnliftIO m) => ExceptT e m a -> (e -> ExceptT e m a) -> ExceptT e m a
|
||||
catchAllOwnErrors action handler = tryAllOwnErrors action >>= either handler pure
|
||||
{-# INLINE catchAllOwnErrors #-}
|
||||
|
||||
catchAllOwnErrors' :: (AnyError e, MonadUnliftIO m) => ExceptT e m a -> (e -> m a) -> m a
|
||||
catchAllOwnErrors' action handler = tryAllOwnErrors' action >>= either handler pure
|
||||
{-# INLINE catchAllOwnErrors' #-}
|
||||
|
||||
eitherToMaybe :: Either a b -> Maybe b
|
||||
eitherToMaybe = either (const Nothing) Just
|
||||
{-# INLINE eitherToMaybe #-}
|
||||
|
||||
@@ -1,4 +1,6 @@
|
||||
module Simplex.Messaging.Version.Internal where
|
||||
module Simplex.Messaging.Version.Internal
|
||||
( Version (..),
|
||||
) where
|
||||
|
||||
import Data.Aeson (FromJSON (..), ToJSON (..))
|
||||
import Data.Word (Word16)
|
||||
|
||||
@@ -7,7 +7,15 @@
|
||||
{-# LANGUAGE OverloadedStrings #-}
|
||||
{-# LANGUAGE PatternSynonyms #-}
|
||||
|
||||
module Simplex.RemoteControl.Discovery where
|
||||
module Simplex.RemoteControl.Discovery
|
||||
( getLocalAddress,
|
||||
mkLastLocalHost,
|
||||
preferAddress,
|
||||
startTLSServer,
|
||||
withSender,
|
||||
withListener,
|
||||
recvAnnounce,
|
||||
) where
|
||||
|
||||
import Control.Applicative ((<|>))
|
||||
import Control.Logger.Simple
|
||||
|
||||
@@ -8,7 +8,30 @@
|
||||
{-# LANGUAGE OverloadedStrings #-}
|
||||
{-# LANGUAGE TemplateHaskell #-}
|
||||
|
||||
module Simplex.RemoteControl.Types where
|
||||
module Simplex.RemoteControl.Types
|
||||
( RCErrorType (..),
|
||||
RCPVersion,
|
||||
VersionRCP,
|
||||
VersionRangeRCP,
|
||||
IpProbe (..),
|
||||
RCHostHello (..),
|
||||
RCCtrlHello (..),
|
||||
RCHostPairing (..),
|
||||
KnownHostPairing (..),
|
||||
RCCtrlAddress (..),
|
||||
RCCtrlPairing (..),
|
||||
RCHostKeys (..),
|
||||
RCHostSession (..),
|
||||
HostSessKeys (..),
|
||||
RCCtrlSession (..),
|
||||
CtrlSessKeys (..),
|
||||
RCHostEncHello (..),
|
||||
RCCtrlEncHello (..),
|
||||
SessionCode,
|
||||
RCStepTMVar,
|
||||
currentRCPVersion,
|
||||
supportedRCPVRange,
|
||||
) where
|
||||
|
||||
import qualified Data.Aeson as J
|
||||
import qualified Data.Aeson.TH as JQ
|
||||
|
||||
Reference in New Issue
Block a user