mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-03-31 01:05:57 +00:00
SMP protocol: optimize batching transactions, remove Builder (#961)
* remove Builder * fewer chunks * remove lazy bytestrings * optimize * pad
This commit is contained in:
committed by
GitHub
parent
cd4329f2de
commit
7f7a77c4eb
@@ -103,7 +103,6 @@ library
|
||||
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20231225_failed_work_items
|
||||
Simplex.Messaging.Agent.TAsyncs
|
||||
Simplex.Messaging.Agent.TRcvQueues
|
||||
Simplex.Messaging.Builder
|
||||
Simplex.Messaging.Client
|
||||
Simplex.Messaging.Client.Agent
|
||||
Simplex.Messaging.Crypto
|
||||
|
||||
@@ -13,7 +13,7 @@ import Control.Monad
|
||||
import Control.Monad.Except
|
||||
import Crypto.Random (ChaChaDRG)
|
||||
import Data.Bifunctor (first)
|
||||
import qualified Data.ByteString.Builder as BB
|
||||
import Data.ByteString.Builder (Builder, byteString)
|
||||
import Data.ByteString.Char8 (ByteString)
|
||||
import qualified Data.ByteString.Char8 as B
|
||||
import Data.Int (Int64)
|
||||
@@ -25,7 +25,6 @@ import qualified Network.HTTP2.Client as H
|
||||
import Simplex.FileTransfer.Description (mb)
|
||||
import Simplex.FileTransfer.Protocol
|
||||
import Simplex.FileTransfer.Transport
|
||||
import Simplex.Messaging.Builder (Builder, builder)
|
||||
import Simplex.Messaging.Client
|
||||
( NetworkConfig (..),
|
||||
ProtocolClientError (..),
|
||||
@@ -142,7 +141,7 @@ sendXFTPCommand c@XFTPClient {http2Client = HTTP2Client {sessionId}} pKey fId cm
|
||||
xftpEncodeTransmission sessionId (Just pKey) ("", fId, FileCmd (sFileParty @p) cmd)
|
||||
sendXFTPTransmission c t chunkSpec_
|
||||
|
||||
sendXFTPTransmission :: XFTPClient -> Builder -> Maybe XFTPChunkSpec -> ExceptT XFTPClientError IO (FileResponse, HTTP2Body)
|
||||
sendXFTPTransmission :: XFTPClient -> ByteString -> Maybe XFTPChunkSpec -> ExceptT XFTPClientError IO (FileResponse, HTTP2Body)
|
||||
sendXFTPTransmission XFTPClient {config, http2Client = http2@HTTP2Client {sessionId}} t chunkSpec_ = do
|
||||
let req = H.requestStreaming N.methodPost "/" [] streamBody
|
||||
reqTimeout = (\XFTPChunkSpec {chunkSize} -> chunkTimeout config chunkSize) <$> chunkSpec_
|
||||
@@ -156,9 +155,9 @@ sendXFTPTransmission XFTPClient {config, http2Client = http2@HTTP2Client {sessio
|
||||
_ -> pure (r, body)
|
||||
Left e -> throwError $ PCEResponseError e
|
||||
where
|
||||
streamBody :: (BB.Builder -> IO ()) -> IO () -> IO ()
|
||||
streamBody :: (Builder -> IO ()) -> IO () -> IO ()
|
||||
streamBody send done = do
|
||||
send $ builder t
|
||||
send $ byteString t
|
||||
forM_ chunkSpec_ $ \XFTPChunkSpec {filePath, chunkOffset, chunkSize} ->
|
||||
withFile filePath ReadMode $ \h -> do
|
||||
hSeek h AbsoluteSeek $ fromIntegral chunkOffset
|
||||
|
||||
@@ -24,7 +24,6 @@ import Data.List.NonEmpty (NonEmpty (..))
|
||||
import Data.Maybe (isNothing)
|
||||
import Data.Type.Equality
|
||||
import Data.Word (Word32)
|
||||
import Simplex.Messaging.Builder (Builder)
|
||||
import qualified Simplex.Messaging.Crypto as C
|
||||
import Simplex.Messaging.Encoding
|
||||
import Simplex.Messaging.Encoding.String
|
||||
@@ -49,8 +48,7 @@ import Simplex.Messaging.Protocol
|
||||
encodeTransmission,
|
||||
messageTagP,
|
||||
tDecodeParseValidate,
|
||||
tEncode,
|
||||
tEncodeBatch,
|
||||
tEncodeBatch1,
|
||||
tParse,
|
||||
)
|
||||
import Simplex.Messaging.Transport (SessionId, TransportError (..))
|
||||
@@ -395,7 +393,7 @@ checkParty' c = case testEquality (sFileParty @p) (sFileParty @p') of
|
||||
Just Refl -> Just c
|
||||
_ -> Nothing
|
||||
|
||||
xftpEncodeTransmission :: ProtocolEncoding e c => SessionId -> Maybe C.APrivateSignKey -> Transmission c -> Either TransportError Builder
|
||||
xftpEncodeTransmission :: ProtocolEncoding e c => SessionId -> Maybe C.APrivateSignKey -> Transmission c -> Either TransportError ByteString
|
||||
xftpEncodeTransmission sessionId pKey (corrId, fId, msg) = do
|
||||
let t = encodeTransmission currentXFTPVersion sessionId (corrId, fId, msg)
|
||||
xftpEncodeBatch1 $ signTransmission t
|
||||
@@ -404,10 +402,8 @@ xftpEncodeTransmission sessionId pKey (corrId, fId, msg) = do
|
||||
signTransmission t = ((`C.sign` t) <$> pKey, t)
|
||||
|
||||
-- this function uses batch syntax but puts only one transmission in the batch
|
||||
xftpEncodeBatch1 :: (Maybe C.ASignature, ByteString) -> Either TransportError Builder
|
||||
xftpEncodeBatch1 (sig, t) =
|
||||
let t' = tEncodeBatch 1 . encodeLarge $ tEncode (sig, t)
|
||||
in first (const TELargeMsg) $ C.pad' t' xftpBlockSize
|
||||
xftpEncodeBatch1 :: SentRawTransmission -> Either TransportError ByteString
|
||||
xftpEncodeBatch1 t = first (const TELargeMsg) $ C.pad (tEncodeBatch1 t) xftpBlockSize
|
||||
|
||||
xftpDecodeTransmission :: ProtocolEncoding e c => SessionId -> ByteString -> Either XFTPErrorType (SignedTransmission e c)
|
||||
xftpDecodeTransmission sessionId t = do
|
||||
|
||||
@@ -18,6 +18,7 @@ import Control.Monad.Except
|
||||
import Control.Monad.Reader
|
||||
import Data.Bifunctor (first)
|
||||
import qualified Data.ByteString.Base64.URL as B64
|
||||
import Data.ByteString.Builder (byteString)
|
||||
import Data.ByteString.Char8 (ByteString)
|
||||
import qualified Data.ByteString.Char8 as B
|
||||
import Data.Functor (($>))
|
||||
@@ -43,7 +44,6 @@ import Simplex.FileTransfer.Server.Stats
|
||||
import Simplex.FileTransfer.Server.Store
|
||||
import Simplex.FileTransfer.Server.StoreLog
|
||||
import Simplex.FileTransfer.Transport
|
||||
import Simplex.Messaging.Builder (builder)
|
||||
import qualified Simplex.Messaging.Crypto as C
|
||||
import qualified Simplex.Messaging.Crypto.Lazy as LC
|
||||
import Simplex.Messaging.Encoding.String
|
||||
@@ -242,7 +242,7 @@ processRequest HTTP2Request {sessionId, reqBody = body@HTTP2Body {bodyHead}, sen
|
||||
send "padding error" -- TODO respond with BLOCK error?
|
||||
done
|
||||
Right t -> do
|
||||
send $ builder t
|
||||
send $ byteString t
|
||||
-- timeout sending file in the same way as receiving
|
||||
forM_ serverFile_ $ \ServerFile {filePath, fileSize, sbState} -> do
|
||||
withFile filePath ReadMode $ \h -> sendEncFile h send sbState (fromIntegral fileSize)
|
||||
|
||||
@@ -1,52 +0,0 @@
|
||||
{-# LANGUAGE StrictData #-}
|
||||
|
||||
module Simplex.Messaging.Builder
|
||||
( Builder (length, builder),
|
||||
byteString,
|
||||
lazyByteString,
|
||||
word16BE,
|
||||
char8,
|
||||
toLazyByteString,
|
||||
)
|
||||
where
|
||||
|
||||
import qualified Data.ByteString as B
|
||||
import qualified Data.ByteString.Builder as BB
|
||||
import qualified Data.ByteString.Lazy as LB
|
||||
import Data.Word (Word16)
|
||||
|
||||
|
||||
-- length-aware builder
|
||||
data Builder = Builder {length :: Int, builder :: BB.Builder}
|
||||
|
||||
instance Semigroup Builder where
|
||||
Builder l1 b1 <> Builder l2 b2 = Builder (l1 + l2) (b1 <> b2)
|
||||
{-# INLINE (<>) #-}
|
||||
|
||||
instance Monoid Builder where
|
||||
mempty = Builder 0 mempty
|
||||
{-# INLINE mempty #-}
|
||||
mconcat bs = Builder (sum ls) (mconcat bbs)
|
||||
where
|
||||
(ls, bbs) = foldr (\(Builder l b) ~(ls', bbs') -> (l : ls', b : bbs')) ([], []) bs
|
||||
{-# INLINE mconcat #-}
|
||||
|
||||
byteString :: B.ByteString -> Builder
|
||||
byteString s = Builder (B.length s) (BB.byteString s)
|
||||
{-# INLINE byteString #-}
|
||||
|
||||
lazyByteString :: LB.ByteString -> Builder
|
||||
lazyByteString s = Builder (fromIntegral $ LB.length s) (BB.lazyByteString s)
|
||||
{-# INLINE lazyByteString #-}
|
||||
|
||||
word16BE :: Word16 -> Builder
|
||||
word16BE = Builder 2 . BB.word16BE
|
||||
{-# INLINE word16BE #-}
|
||||
|
||||
char8 :: Char -> Builder
|
||||
char8 = Builder 1 . BB.char8
|
||||
{-# INLINE char8 #-}
|
||||
|
||||
toLazyByteString :: Builder -> LB.ByteString
|
||||
toLazyByteString = BB.toLazyByteString . builder
|
||||
{-# INLINE toLazyByteString #-}
|
||||
@@ -96,10 +96,7 @@ import Data.Maybe (fromMaybe)
|
||||
import Data.Time.Clock (UTCTime, getCurrentTime)
|
||||
import Network.Socket (ServiceName)
|
||||
import Numeric.Natural
|
||||
import Simplex.Messaging.Builder (Builder)
|
||||
import qualified Simplex.Messaging.Builder as BB
|
||||
import qualified Simplex.Messaging.Crypto as C
|
||||
import Simplex.Messaging.Encoding
|
||||
import Simplex.Messaging.Encoding.String
|
||||
import Simplex.Messaging.Parsers (defaultJSON, dropPrefix, enumJSON)
|
||||
import Simplex.Messaging.Protocol
|
||||
@@ -136,7 +133,7 @@ data PClient err msg = PClient
|
||||
pingErrorCount :: TVar Int,
|
||||
clientCorrId :: TVar Natural,
|
||||
sentCommands :: TMap CorrId (Request err msg),
|
||||
sndQ :: TBQueue Builder,
|
||||
sndQ :: TBQueue ByteString,
|
||||
rcvQ :: TBQueue (NonEmpty (SignedTransmission err msg)),
|
||||
msgQ :: Maybe (TBQueue (ServerTransmission msg))
|
||||
}
|
||||
@@ -660,9 +657,11 @@ sendBatch c@ProtocolClient {client_ = PClient {sndQ}} b = do
|
||||
TBLargeTransmission Request {entityId} -> do
|
||||
putStrLn "send error: large message"
|
||||
pure [Response entityId $ Left $ PCETransportError TELargeMsg]
|
||||
TBTransmissions s n rs -> do
|
||||
when (n > 0) $ atomically $ writeTBQueue sndQ $ tEncodeBatch n s
|
||||
mapConcurrently (getResponse c) rs
|
||||
TBTransmissions s n rs
|
||||
| n > 0 -> do
|
||||
atomically $ writeTBQueue sndQ s
|
||||
mapConcurrently (getResponse c) rs
|
||||
| otherwise -> pure []
|
||||
TBTransmission s r -> do
|
||||
atomically $ writeTBQueue sndQ s
|
||||
(: []) <$> getResponse c r
|
||||
@@ -675,11 +674,11 @@ sendProtocolCommand c@ProtocolClient {client_ = PClient {sndQ}, batch, blockSize
|
||||
-- two separate "atomically" needed to avoid blocking
|
||||
sendRecv :: SentRawTransmission -> Request err msg -> IO (Either (ProtocolClientError err) msg)
|
||||
sendRecv t r
|
||||
| BB.length s > blockSize - 2 = pure $ Left $ PCETransportError TELargeMsg
|
||||
| B.length s > blockSize - 2 = pure $ Left $ PCETransportError TELargeMsg
|
||||
| otherwise = atomically (writeTBQueue sndQ s) >> response <$> getResponse c r
|
||||
where
|
||||
s
|
||||
| batch = tEncodeBatch 1 . encodeLarge $ tEncode t
|
||||
| batch = tEncodeBatch1 t
|
||||
| otherwise = tEncode t
|
||||
|
||||
-- TODO switch to timeout or TimeManager that supports Int64
|
||||
|
||||
@@ -140,7 +140,6 @@ module Simplex.Messaging.Crypto
|
||||
|
||||
-- * Message padding / un-padding
|
||||
pad,
|
||||
pad',
|
||||
unPad,
|
||||
|
||||
-- * X509 Certificates
|
||||
@@ -206,8 +205,6 @@ import Database.SQLite.Simple.FromField (FromField (..))
|
||||
import Database.SQLite.Simple.ToField (ToField (..))
|
||||
import GHC.TypeLits (ErrorMessage (..), KnownNat, Nat, TypeError, natVal, type (+))
|
||||
import Network.Transport.Internal (decodeWord16, encodeWord16)
|
||||
import Simplex.Messaging.Builder (Builder, byteString, word16BE)
|
||||
import qualified Simplex.Messaging.Builder as BB
|
||||
import Simplex.Messaging.Encoding
|
||||
import Simplex.Messaging.Encoding.String
|
||||
import Simplex.Messaging.Parsers (blobFieldDecoder, parseAll, parseString)
|
||||
@@ -922,14 +919,6 @@ pad msg paddedLen
|
||||
len = B.length msg
|
||||
padLen = paddedLen - len - 2
|
||||
|
||||
pad' :: Builder -> Int -> Either CryptoError Builder
|
||||
pad' msg paddedLen
|
||||
| len <= maxMsgLen && padLen >= 0 = Right $ word16BE (fromIntegral len) <> msg <> byteString (B.replicate padLen '#')
|
||||
| otherwise = Left CryptoLargeMsgError
|
||||
where
|
||||
len = BB.length msg
|
||||
padLen = paddedLen - len - 2
|
||||
|
||||
unPad :: ByteString -> Either CryptoError ByteString
|
||||
unPad padded
|
||||
| B.length lenWrd == 2 && B.length rest >= len = Right $ B.take len rest
|
||||
|
||||
@@ -11,7 +11,6 @@ module Simplex.Messaging.Encoding
|
||||
( Encoding (..),
|
||||
Tail (..),
|
||||
Large (..),
|
||||
encodeLarge,
|
||||
_smpP,
|
||||
smpEncodeList,
|
||||
smpListP,
|
||||
@@ -30,8 +29,6 @@ import qualified Data.List.NonEmpty as L
|
||||
import Data.Time.Clock.System (SystemTime (..))
|
||||
import Data.Word (Word16, Word32)
|
||||
import Network.Transport.Internal (decodeWord16, decodeWord32, encodeWord16, encodeWord32)
|
||||
import Simplex.Messaging.Builder (Builder, word16BE)
|
||||
import qualified Simplex.Messaging.Builder as BB
|
||||
import Simplex.Messaging.Parsers (parseAll)
|
||||
import Simplex.Messaging.Util ((<$?>))
|
||||
|
||||
@@ -141,10 +138,6 @@ instance Encoding Large where
|
||||
Large <$> A.take len
|
||||
{-# INLINE smpP #-}
|
||||
|
||||
encodeLarge :: Builder -> Builder
|
||||
encodeLarge s = word16BE (fromIntegral $ BB.length s) <> s
|
||||
{-# INLINE encodeLarge #-}
|
||||
|
||||
instance Encoding SystemTime where
|
||||
smpEncode = smpEncode . systemSeconds
|
||||
{-# INLINE smpEncode #-}
|
||||
|
||||
@@ -369,7 +369,7 @@ receive th NtfServerClient {rcvQ, sndQ, rcvActiveAt} = forever $ do
|
||||
send :: Transport c => THandle c -> NtfServerClient -> IO ()
|
||||
send h@THandle {thVersion = v} NtfServerClient {sndQ, sessionId, sndActiveAt} = forever $ do
|
||||
t <- atomically $ readTBQueue sndQ
|
||||
void . liftIO $ tPut h Nothing [(Nothing, encodeTransmission v sessionId t)]
|
||||
void . liftIO $ tPut h [(Nothing, encodeTransmission v sessionId t)]
|
||||
atomically . writeTVar sndActiveAt =<< liftIO getSystemTime
|
||||
|
||||
-- instance Show a => Show (TVar a) where
|
||||
|
||||
@@ -144,7 +144,7 @@ module Simplex.Messaging.Protocol
|
||||
tParse,
|
||||
tDecodeParseValidate,
|
||||
tEncode,
|
||||
tEncodeBatch,
|
||||
tEncodeBatch1,
|
||||
batchTransmissions,
|
||||
batchTransmissions',
|
||||
|
||||
@@ -155,7 +155,6 @@ module Simplex.Messaging.Protocol
|
||||
where
|
||||
|
||||
import Control.Applicative (optional, (<|>))
|
||||
import Control.Concurrent (threadDelay)
|
||||
import Control.Monad.Except
|
||||
import Data.Aeson (FromJSON (..), ToJSON (..))
|
||||
import qualified Data.Aeson.TH as J
|
||||
@@ -163,8 +162,6 @@ import Data.Attoparsec.ByteString.Char8 (Parser, (<?>))
|
||||
import qualified Data.Attoparsec.ByteString.Char8 as A
|
||||
import Data.ByteString.Char8 (ByteString)
|
||||
import qualified Data.ByteString.Char8 as B
|
||||
import qualified Data.ByteString.Lazy.Char8 as LB
|
||||
import qualified Data.ByteString.Lazy.Internal as LB
|
||||
import Data.Char (isPrint, isSpace)
|
||||
import Data.Constraint (Dict (..))
|
||||
import Data.Functor (($>))
|
||||
@@ -177,8 +174,6 @@ import Data.Time.Clock.System (SystemTime (..))
|
||||
import Data.Type.Equality
|
||||
import GHC.TypeLits (ErrorMessage (..), TypeError, type (+))
|
||||
import Network.Socket (HostName, ServiceName)
|
||||
import Simplex.Messaging.Builder (Builder, char8, lazyByteString)
|
||||
import qualified Simplex.Messaging.Builder as BB
|
||||
import qualified Simplex.Messaging.Crypto as C
|
||||
import Simplex.Messaging.Encoding
|
||||
import Simplex.Messaging.Encoding.String
|
||||
@@ -1288,16 +1283,16 @@ instance Encoding CommandError where
|
||||
_ -> fail "bad command error type"
|
||||
|
||||
-- | Send signed SMP transmission to TCP transport.
|
||||
tPut :: Transport c => THandle c -> Maybe Int -> NonEmpty SentRawTransmission -> IO [Either TransportError ()]
|
||||
tPut th delay_ = fmap concat . mapM tPutBatch . batchTransmissions (batch th) (blockSize th)
|
||||
tPut :: Transport c => THandle c -> NonEmpty SentRawTransmission -> IO [Either TransportError ()]
|
||||
tPut th = fmap concat . mapM tPutBatch . batchTransmissions (batch th) (blockSize th)
|
||||
where
|
||||
tPutBatch :: TransportBatch () -> IO [Either TransportError ()]
|
||||
tPutBatch = \case
|
||||
TBLargeTransmission _ -> [Left TELargeMsg] <$ putStrLn "tPut error: large message"
|
||||
TBTransmissions s n _ -> replicate n <$> (tPutLog th (tEncodeBatch n s) <* mapM_ threadDelay delay_)
|
||||
TBTransmissions s n _ -> replicate n <$> (tPutLog th s)
|
||||
TBTransmission s _ -> (: []) <$> tPutLog th s
|
||||
|
||||
tPutLog :: Transport c => THandle c -> Builder -> IO (Either TransportError ())
|
||||
tPutLog :: Transport c => THandle c -> ByteString -> IO (Either TransportError ())
|
||||
tPutLog th s = do
|
||||
r <- tPutBlock th s
|
||||
case r of
|
||||
@@ -1305,8 +1300,8 @@ tPutLog th s = do
|
||||
_ -> pure ()
|
||||
pure r
|
||||
|
||||
-- Builder in TBTransmissions does not include byte with transmissions count, it is added by tEncodeBatch
|
||||
data TransportBatch r = TBTransmissions Builder Int [r] | TBTransmission Builder r | TBLargeTransmission r
|
||||
-- ByteString in TBTransmissions includes byte with transmissions count
|
||||
data TransportBatch r = TBTransmissions ByteString Int [r] | TBTransmission ByteString r | TBLargeTransmission r
|
||||
|
||||
batchTransmissions :: Bool -> Int -> NonEmpty SentRawTransmission -> [TransportBatch ()]
|
||||
batchTransmissions batch bSize = batchTransmissions' batch bSize . L.map (,())
|
||||
@@ -1314,35 +1309,43 @@ batchTransmissions batch bSize = batchTransmissions' batch bSize . L.map (,())
|
||||
-- | encodes and batches transmissions into blocks,
|
||||
batchTransmissions' :: forall r. Bool -> Int -> NonEmpty (SentRawTransmission, r) -> [TransportBatch r]
|
||||
batchTransmissions' batch bSize
|
||||
| batch = addBatch . foldr addTransmission ([], mempty, 0, [])
|
||||
| batch = addBatch . foldr addTransmission ([], 0, 0, [], [])
|
||||
| otherwise = map mkBatch1 . L.toList
|
||||
where
|
||||
mkBatch1 :: (SentRawTransmission, r) -> TransportBatch r
|
||||
mkBatch1 (t, r)
|
||||
-- 2 bytes are reserved for pad size
|
||||
| BB.length s <= bSize - 2 = TBTransmission s r
|
||||
| B.length s <= bSize - 2 = TBTransmission s r
|
||||
| otherwise = TBLargeTransmission r
|
||||
where
|
||||
s = tEncode t
|
||||
addTransmission :: (SentRawTransmission, r) -> ([TransportBatch r], Builder, Int, [r]) -> ([TransportBatch r], Builder, Int, [r])
|
||||
addTransmission (t, r) acc@(bs, b, n, rs)
|
||||
-- 3 = 2 bytes reserved for pad size + 1 for transmission count
|
||||
| len + BB.length b <= bSize - 3 && n < 255 = (bs, s <> b, 1 + n, r : rs)
|
||||
| len <= bSize - 3 = (addBatch acc, s, 1, [r])
|
||||
| otherwise = (TBLargeTransmission r : addBatch acc, mempty, 0, [])
|
||||
-- 3 = 2 bytes reserved for pad size + 1 for transmission count
|
||||
bSize' = bSize - 3
|
||||
addTransmission :: (SentRawTransmission, r) -> ([TransportBatch r], Int, Int, [ByteString], [r]) -> ([TransportBatch r], Int, Int, [ByteString], [r])
|
||||
addTransmission (t, r) acc@(bs, len, n, ss, rs)
|
||||
| len' <= bSize' && n < 255 = (bs, len', 1 + n, s : ss, r : rs)
|
||||
| sLen <= bSize' = (addBatch acc, sLen, 1, [s], [r])
|
||||
| otherwise = (TBLargeTransmission r : addBatch acc, 0, 0, [], [])
|
||||
where
|
||||
s = encodeLarge $ tEncode t
|
||||
len = BB.length s
|
||||
addBatch :: ([TransportBatch r], Builder, Int, [r]) -> [TransportBatch r]
|
||||
addBatch (bs, b, n, rs) = if n == 0 then bs else TBTransmissions b n rs : bs
|
||||
s = tEncodeForBatch t
|
||||
sLen = B.length s
|
||||
len' = len + sLen
|
||||
addBatch :: ([TransportBatch r], Int, Int, [ByteString], [r]) -> [TransportBatch r]
|
||||
addBatch (bs, _len, n, ss, rs) = if n == 0 then bs else TBTransmissions b n rs : bs
|
||||
where
|
||||
b = B.concat $ B.singleton (lenEncode n) : ss
|
||||
|
||||
tEncode :: SentRawTransmission -> Builder
|
||||
tEncode (sig, t) = lazyByteString $ LB.chunk (smpEncode $ C.signatureBytes sig) (LB.fromStrict t)
|
||||
tEncode :: SentRawTransmission -> ByteString
|
||||
tEncode (sig, t) = smpEncode (C.signatureBytes sig) <> t
|
||||
{-# INLINE tEncode #-}
|
||||
|
||||
tEncodeBatch :: Int -> Builder -> Builder
|
||||
tEncodeBatch n s = char8 (lenEncode n) <> s
|
||||
{-# INLINE tEncodeBatch #-}
|
||||
tEncodeForBatch :: SentRawTransmission -> ByteString
|
||||
tEncodeForBatch = smpEncode . Large . tEncode
|
||||
{-# INLINE tEncodeForBatch #-}
|
||||
|
||||
tEncodeBatch1 :: SentRawTransmission -> ByteString
|
||||
tEncodeBatch1 t = lenEncode 1 `B.cons` tEncodeForBatch t
|
||||
{-# INLINE tEncodeBatch1 #-}
|
||||
|
||||
encodeTransmission :: ProtocolEncoding e c => Version -> ByteString -> Transmission c -> ByteString
|
||||
encodeTransmission v sessionId (CorrId corrId, queueId, command) =
|
||||
|
||||
@@ -439,7 +439,7 @@ send h@THandle {thVersion = v} Client {sndQ, sessionId, sndActiveAt} = do
|
||||
labelMyThread . B.unpack $ "client $" <> encode sessionId <> " send"
|
||||
forever $ do
|
||||
ts <- atomically $ L.sortWith tOrder <$> readTBQueue sndQ
|
||||
void . liftIO . tPut h Nothing $ L.map ((Nothing,) . encodeTransmission v sessionId) ts
|
||||
void . liftIO . tPut h $ L.map ((Nothing,) . encodeTransmission v sessionId) ts
|
||||
atomically . writeTVar sndActiveAt =<< liftIO getSystemTime
|
||||
where
|
||||
tOrder :: Transmission BrokerMsg -> Int
|
||||
|
||||
@@ -78,7 +78,6 @@ import Network.Socket
|
||||
import qualified Network.TLS as T
|
||||
import qualified Network.TLS.Extra as TE
|
||||
import qualified Paths_simplexmq as SMQ
|
||||
import Simplex.Messaging.Builder (Builder, byteString, toLazyByteString)
|
||||
import qualified Simplex.Messaging.Crypto as C
|
||||
import Simplex.Messaging.Encoding
|
||||
import Simplex.Messaging.Parsers (dropPrefix, parse, parseRead1, sumTypeJSON)
|
||||
@@ -135,9 +134,6 @@ class Transport c where
|
||||
-- | Write bytes to connection
|
||||
cPut :: c -> ByteString -> IO ()
|
||||
|
||||
-- | Write bytes to connection
|
||||
cPut' :: c -> LB.ByteString -> IO ()
|
||||
|
||||
-- | Receive ByteString from connection, allowing LF or CRLF termination.
|
||||
getLn :: c -> IO ByteString
|
||||
|
||||
@@ -221,11 +217,8 @@ instance Transport TLS where
|
||||
getBuffered tlsBuffer n t_ (T.recvData tlsContext)
|
||||
|
||||
cPut :: TLS -> ByteString -> IO ()
|
||||
cPut cxt = cPut' cxt . LB.fromStrict
|
||||
|
||||
cPut' :: TLS -> LB.ByteString -> IO ()
|
||||
cPut' TLS {tlsContext, tlsTransportConfig = TransportConfig {transportTimeout = t_}} s =
|
||||
withTimedErr t_ $ T.sendData tlsContext s
|
||||
cPut TLS {tlsContext, tlsTransportConfig = TransportConfig {transportTimeout = t_}} =
|
||||
withTimedErr t_ . T.sendData tlsContext . LB.fromStrict
|
||||
|
||||
getLn :: TLS -> IO ByteString
|
||||
getLn TLS {tlsContext, tlsBuffer} = do
|
||||
@@ -316,10 +309,10 @@ serializeTransportError = \case
|
||||
TEHandshake e -> "HANDSHAKE " <> bshow e
|
||||
|
||||
-- | Pad and send block to SMP transport.
|
||||
tPutBlock :: Transport c => THandle c -> Builder -> IO (Either TransportError ())
|
||||
tPutBlock :: Transport c => THandle c -> ByteString -> IO (Either TransportError ())
|
||||
tPutBlock THandle {connection = c, blockSize} block =
|
||||
bimapM (const $ pure TELargeMsg) (cPut' c . toLazyByteString) $
|
||||
C.pad' block blockSize
|
||||
bimapM (const $ pure TELargeMsg) (cPut c) $
|
||||
C.pad block blockSize
|
||||
|
||||
-- | Receive block from SMP transport.
|
||||
tGetBlock :: Transport c => THandle c -> IO (Either TransportError ByteString)
|
||||
@@ -363,7 +356,7 @@ smpThHandle :: forall c. THandle c -> Version -> THandle c
|
||||
smpThHandle th v = (th :: THandle c) {thVersion = v, batch = v >= 4}
|
||||
|
||||
sendHandshake :: (Transport c, Encoding smp) => THandle c -> smp -> ExceptT TransportError IO ()
|
||||
sendHandshake th = ExceptT . tPutBlock th . byteString . smpEncode
|
||||
sendHandshake th = ExceptT . tPutBlock th . smpEncode
|
||||
|
||||
getHandshake :: (Transport c, Encoding smp) => THandle c -> ExceptT TransportError IO smp
|
||||
getHandshake th = ExceptT $ (parse smpP (TEHandshake PARSE) =<<) <$> tGetBlock th
|
||||
|
||||
@@ -72,9 +72,6 @@ instance Transport WS where
|
||||
cPut :: WS -> ByteString -> IO ()
|
||||
cPut = sendBinaryData . wsConnection
|
||||
|
||||
cPut' :: WS -> LB.ByteString -> IO ()
|
||||
cPut' = sendBinaryData . wsConnection
|
||||
|
||||
getLn :: WS -> IO ByteString
|
||||
getLn c = do
|
||||
s <- trimCR <$> receiveData (wsConnection c)
|
||||
|
||||
@@ -5,9 +5,8 @@ module CoreTests.BatchingTests (batchingTests) where
|
||||
import Control.Concurrent.STM
|
||||
import Control.Monad
|
||||
import Data.ByteString.Char8 (ByteString)
|
||||
import qualified Data.ByteString as B
|
||||
import qualified Data.List.NonEmpty as L
|
||||
import Simplex.Messaging.Builder (Builder)
|
||||
import qualified Simplex.Messaging.Builder as BB
|
||||
import Simplex.Messaging.Client
|
||||
import qualified Simplex.Messaging.Crypto as C
|
||||
import Simplex.Messaging.Protocol
|
||||
@@ -170,8 +169,8 @@ randomSENDCmd c len = do
|
||||
msg <- atomically $ C.randomBytes len g
|
||||
mkTransmission c (Just rpKey, sId, Cmd SSender $ SEND noMsgFlags msg)
|
||||
|
||||
lenOk :: Builder -> Bool
|
||||
lenOk s = 0 < BB.length s && BB.length s <= smpBlockSize - 2
|
||||
lenOk :: ByteString -> Bool
|
||||
lenOk s = 0 < B.length s && B.length s <= smpBlockSize - 2
|
||||
|
||||
lenOk1 :: TransportBatch r -> Bool
|
||||
lenOk1 = \case
|
||||
|
||||
@@ -136,7 +136,7 @@ ntfServerTest _ t = runNtfTest $ \h -> tPut' h t >> tGet' h
|
||||
tPut' :: THandle c -> (Maybe C.ASignature, ByteString, ByteString, smp) -> IO ()
|
||||
tPut' h@THandle {sessionId} (sig, corrId, queueId, smp) = do
|
||||
let t' = smpEncode (sessionId, corrId, queueId, smp)
|
||||
[Right ()] <- tPut h Nothing [(sig, t')]
|
||||
[Right ()] <- tPut h [(sig, t')]
|
||||
pure ()
|
||||
tGet' h = do
|
||||
[(Nothing, _, (CorrId corrId, qId, Right cmd))] <- tGet h
|
||||
|
||||
@@ -164,7 +164,7 @@ smpServerTest _ t = runSmpTest $ \h -> tPut' h t >> tGet' h
|
||||
tPut' :: THandle c -> (Maybe C.ASignature, ByteString, ByteString, smp) -> IO ()
|
||||
tPut' h@THandle {sessionId} (sig, corrId, queueId, smp) = do
|
||||
let t' = smpEncode (sessionId, corrId, queueId, smp)
|
||||
[Right ()] <- tPut h Nothing [(sig, t')]
|
||||
[Right ()] <- tPut h [(sig, t')]
|
||||
pure ()
|
||||
tGet' h = do
|
||||
[(Nothing, _, (CorrId corrId, qId, Right cmd))] <- tGet h
|
||||
|
||||
@@ -89,7 +89,7 @@ signSendRecv h@THandle {thVersion, sessionId} pk (corrId, qId, cmd) = do
|
||||
|
||||
tPut1 :: Transport c => THandle c -> SentRawTransmission -> IO (Either TransportError ())
|
||||
tPut1 h t = do
|
||||
[r] <- tPut h Nothing [t]
|
||||
[r] <- tPut h [t]
|
||||
pure r
|
||||
|
||||
tGet1 :: (ProtocolEncoding err cmd, Transport c, MonadIO m, MonadFail m) => THandle c -> m (SignedTransmission err cmd)
|
||||
|
||||
Reference in New Issue
Block a user