mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-04-01 18:16:24 +00:00
sized builder
This commit is contained in:
@@ -103,6 +103,7 @@ library
|
||||
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20231225_failed_work_items
|
||||
Simplex.Messaging.Agent.TAsyncs
|
||||
Simplex.Messaging.Agent.TRcvQueues
|
||||
Simplex.Messaging.Builder
|
||||
Simplex.Messaging.Client
|
||||
Simplex.Messaging.Client.Agent
|
||||
Simplex.Messaging.Crypto
|
||||
|
||||
@@ -13,7 +13,7 @@ import Control.Monad
|
||||
import Control.Monad.Except
|
||||
import Crypto.Random (ChaChaDRG)
|
||||
import Data.Bifunctor (first)
|
||||
import Data.ByteString.Builder (Builder)
|
||||
import qualified Data.ByteString.Builder as BB
|
||||
import Data.ByteString.Char8 (ByteString)
|
||||
import qualified Data.ByteString.Char8 as B
|
||||
import Data.Int (Int64)
|
||||
@@ -35,6 +35,7 @@ import Simplex.Messaging.Client
|
||||
transportClientConfig,
|
||||
)
|
||||
import Simplex.Messaging.Client.Agent ()
|
||||
import Simplex.Messaging.Builder (Builder, builder)
|
||||
import qualified Simplex.Messaging.Crypto as C
|
||||
import qualified Simplex.Messaging.Crypto.Lazy as LC
|
||||
import Simplex.Messaging.Encoding.String
|
||||
@@ -152,9 +153,9 @@ sendXFTPTransmission XFTPClient {config, http2Client = http2@HTTP2Client {sessio
|
||||
_ -> pure (r, body)
|
||||
Left e -> throwError $ PCEResponseError e
|
||||
where
|
||||
streamBody :: (Builder -> IO ()) -> IO () -> IO ()
|
||||
streamBody :: (BB.Builder -> IO ()) -> IO () -> IO ()
|
||||
streamBody send done = do
|
||||
send t
|
||||
send $ builder t
|
||||
forM_ chunkSpec_ $ \XFTPChunkSpec {filePath, chunkOffset, chunkSize} ->
|
||||
withFile filePath ReadMode $ \h -> do
|
||||
hSeek h AbsoluteSeek $ fromIntegral chunkOffset
|
||||
|
||||
@@ -17,7 +17,7 @@ import Control.Applicative ((<|>))
|
||||
import qualified Data.Aeson.TH as J
|
||||
import qualified Data.Attoparsec.ByteString.Char8 as A
|
||||
import Data.Bifunctor (first)
|
||||
import Data.ByteString.Builder (Builder)
|
||||
import Simplex.Messaging.Builder (Builder)
|
||||
import Data.ByteString.Char8 (ByteString)
|
||||
import qualified Data.ByteString.Char8 as B
|
||||
import Data.Kind (Type)
|
||||
|
||||
@@ -43,6 +43,7 @@ import Simplex.FileTransfer.Server.Stats
|
||||
import Simplex.FileTransfer.Server.Store
|
||||
import Simplex.FileTransfer.Server.StoreLog
|
||||
import Simplex.FileTransfer.Transport
|
||||
import Simplex.Messaging.Builder (builder)
|
||||
import qualified Simplex.Messaging.Crypto as C
|
||||
import qualified Simplex.Messaging.Crypto.Lazy as LC
|
||||
import Simplex.Messaging.Encoding.String
|
||||
@@ -241,7 +242,7 @@ processRequest HTTP2Request {sessionId, reqBody = body@HTTP2Body {bodyHead}, sen
|
||||
send "padding error" -- TODO respond with BLOCK error?
|
||||
done
|
||||
Right t -> do
|
||||
send t
|
||||
send $ builder t
|
||||
-- timeout sending file in the same way as receiving
|
||||
forM_ serverFile_ $ \ServerFile {filePath, fileSize, sbState} -> do
|
||||
withFile filePath ReadMode $ \h -> sendEncFile h send sbState (fromIntegral fileSize)
|
||||
|
||||
@@ -85,10 +85,10 @@ import Control.Monad
|
||||
import Control.Monad.IO.Class (liftIO)
|
||||
import Control.Monad.Trans.Except
|
||||
import qualified Data.Aeson.TH as J
|
||||
import Data.ByteString.Builder (Builder, lazyByteString)
|
||||
import Simplex.Messaging.Builder (Builder)
|
||||
import qualified Simplex.Messaging.Builder as BB
|
||||
import Data.ByteString.Char8 (ByteString)
|
||||
import qualified Data.ByteString.Char8 as B
|
||||
import qualified Data.ByteString.Lazy.Char8 as LB
|
||||
import Data.Functor (($>))
|
||||
import Data.Int (Int64)
|
||||
import Data.List (find)
|
||||
@@ -675,14 +675,12 @@ sendProtocolCommand c@ProtocolClient {client_ = PClient {sndQ}, batch, blockSize
|
||||
-- two separate "atomically" needed to avoid blocking
|
||||
sendRecv :: SentRawTransmission -> Request err msg -> IO (Either (ProtocolClientError err) msg)
|
||||
sendRecv t r
|
||||
| sLen > blockSize - 2 = pure $ Left $ PCETransportError TELargeMsg
|
||||
| BB.length s > blockSize - 2 = pure $ Left $ PCETransportError TELargeMsg
|
||||
| otherwise = atomically (writeTBQueue sndQ s) >> response <$> getResponse c r
|
||||
where
|
||||
(sLen, s)
|
||||
| batch = (tLen + 3, tEncodeBatch 1 . encodeLarge $ tEncode t)
|
||||
| otherwise = (tLen, lazyByteString $ tEncode t)
|
||||
t' = tEncode t
|
||||
tLen = fromIntegral $ LB.length t'
|
||||
s
|
||||
| batch = tEncodeBatch 1 . encodeLarge $ tEncode t
|
||||
| otherwise = tEncode t
|
||||
|
||||
-- TODO switch to timeout or TimeManager that supports Int64
|
||||
getResponse :: ProtocolClient err msg -> Request err msg -> IO (Response err msg)
|
||||
|
||||
@@ -191,10 +191,10 @@ import Data.ByteArray (ByteArrayAccess)
|
||||
import qualified Data.ByteArray as BA
|
||||
import Data.ByteString.Base64 (decode, encode)
|
||||
import qualified Data.ByteString.Base64.URL as U
|
||||
import Data.ByteString.Builder (Builder, byteString, toLazyByteString, word16BE)
|
||||
import Simplex.Messaging.Builder (Builder, byteString, word16BE)
|
||||
import qualified Simplex.Messaging.Builder as BB
|
||||
import Data.ByteString.Char8 (ByteString)
|
||||
import qualified Data.ByteString.Char8 as B
|
||||
import qualified Data.ByteString.Lazy.Char8 as LB
|
||||
import Data.ByteString.Lazy (fromStrict, toStrict)
|
||||
import Data.Constraint (Dict (..))
|
||||
import Data.Kind (Constraint, Type)
|
||||
@@ -924,10 +924,10 @@ pad msg paddedLen
|
||||
|
||||
pad' :: Builder -> Int -> Either CryptoError Builder
|
||||
pad' msg paddedLen
|
||||
| len <= maxMsgLen && padLen >= 0 = Right $ byteString (encodeWord16 $ fromIntegral len) <> msg <> byteString (B.replicate padLen '#')
|
||||
| len <= maxMsgLen && padLen >= 0 = Right $ word16BE (fromIntegral len) <> msg <> byteString (B.replicate padLen '#')
|
||||
| otherwise = Left CryptoLargeMsgError
|
||||
where
|
||||
len = fromIntegral $ LB.length $ toLazyByteString msg
|
||||
len = BB.length msg
|
||||
padLen = paddedLen - len - 2
|
||||
|
||||
unPad :: ByteString -> Either CryptoError ByteString
|
||||
|
||||
@@ -22,10 +22,10 @@ where
|
||||
import Data.Attoparsec.ByteString.Char8 (Parser)
|
||||
import qualified Data.Attoparsec.ByteString.Char8 as A
|
||||
import Data.Bits (shiftL, shiftR, (.|.))
|
||||
import Data.ByteString.Builder (Builder, byteString, lazyByteString, word16BE)
|
||||
import Simplex.Messaging.Builder (Builder, word16BE)
|
||||
import qualified Simplex.Messaging.Builder as BB
|
||||
import Data.ByteString.Char8 (ByteString)
|
||||
import qualified Data.ByteString.Char8 as B
|
||||
import qualified Data.ByteString.Lazy.Char8 as LB
|
||||
import Data.ByteString.Internal (c2w, w2c)
|
||||
import Data.Int (Int64)
|
||||
import qualified Data.List.NonEmpty as L
|
||||
@@ -141,8 +141,8 @@ instance Encoding Large where
|
||||
Large <$> A.take len
|
||||
{-# INLINE smpP #-}
|
||||
|
||||
encodeLarge :: LB.ByteString -> Builder
|
||||
encodeLarge s = byteString (encodeWord16 $ fromIntegral $ LB.length s) <> lazyByteString s
|
||||
encodeLarge :: Builder -> Builder
|
||||
encodeLarge s = word16BE (fromIntegral $ BB.length s) <> s
|
||||
{-# INLINE encodeLarge #-}
|
||||
|
||||
instance Encoding SystemTime where
|
||||
|
||||
@@ -161,7 +161,8 @@ import Data.Aeson (FromJSON (..), ToJSON (..))
|
||||
import qualified Data.Aeson.TH as J
|
||||
import Data.Attoparsec.ByteString.Char8 (Parser, (<?>))
|
||||
import qualified Data.Attoparsec.ByteString.Char8 as A
|
||||
import Data.ByteString.Builder (Builder, char8, lazyByteString)
|
||||
import Simplex.Messaging.Builder (Builder, char8, lazyByteString)
|
||||
import qualified Simplex.Messaging.Builder as BB
|
||||
import Data.ByteString.Char8 (ByteString)
|
||||
import qualified Data.ByteString.Char8 as B
|
||||
import qualified Data.ByteString.Lazy.Char8 as LB
|
||||
@@ -1313,31 +1314,29 @@ batchTransmissions batch bSize = batchTransmissions' batch bSize . L.map (,())
|
||||
-- | encodes and batches transmissions into blocks,
|
||||
batchTransmissions' :: forall r. Bool -> Int -> NonEmpty (SentRawTransmission, r) -> [TransportBatch r]
|
||||
batchTransmissions' batch bSize
|
||||
| batch = addBatch . foldr addTransmission ([], mempty, 0, 0, [])
|
||||
| batch = addBatch . foldr addTransmission ([], mempty, 0, [])
|
||||
| otherwise = map mkBatch1 . L.toList
|
||||
where
|
||||
mkBatch1 :: (SentRawTransmission, r) -> TransportBatch r
|
||||
mkBatch1 (t, r)
|
||||
-- 2 bytes are reserved for pad size
|
||||
| LB.length s <= fromIntegral (bSize - 2) = TBTransmission (lazyByteString s) r
|
||||
| BB.length s <= fromIntegral (bSize - 2) = TBTransmission s r
|
||||
| otherwise = TBLargeTransmission r
|
||||
where
|
||||
s = tEncode t
|
||||
addTransmission :: (SentRawTransmission, r) -> ([TransportBatch r], Builder, Int, Int, [r]) -> ([TransportBatch r], Builder, Int, Int, [r])
|
||||
addTransmission (t, r) acc@(bs, b, len, n, rs)
|
||||
| len' <= bSize - 3 && n < 255 = (bs, s <> b, len', 1 + n, r : rs)
|
||||
| sLen <= bSize - 3 = (addBatch acc, s, sLen, 1, [r])
|
||||
| otherwise = (TBLargeTransmission r : addBatch acc, mempty, 0, 0, [])
|
||||
addTransmission :: (SentRawTransmission, r) -> ([TransportBatch r], Builder, Int, [r]) -> ([TransportBatch r], Builder, Int, [r])
|
||||
addTransmission (t, r) acc@(bs, b, n, rs)
|
||||
| len + BB.length b <= bSize - 3 && n < 255 = (bs, s <> b, 1 + n, r : rs)
|
||||
| len <= bSize - 3 = (addBatch acc, s, 1, [r])
|
||||
| otherwise = (TBLargeTransmission r : addBatch acc, mempty, 0, [])
|
||||
where
|
||||
s = encodeLarge t'
|
||||
sLen = 2 + fromIntegral (LB.length t') -- 2-bytes length is added by encodeLarge
|
||||
t' = tEncode t
|
||||
len' = sLen + len
|
||||
addBatch :: ([TransportBatch r], Builder, Int, Int, [r]) -> [TransportBatch r]
|
||||
addBatch (bs, b, _, n, rs) = if n == 0 then bs else TBTransmissions b n rs : bs
|
||||
s = encodeLarge $ tEncode t
|
||||
len = BB.length s
|
||||
addBatch :: ([TransportBatch r], Builder, Int, [r]) -> [TransportBatch r]
|
||||
addBatch (bs, b, n, rs) = if n == 0 then bs else TBTransmissions b n rs : bs
|
||||
|
||||
tEncode :: SentRawTransmission -> LB.ByteString
|
||||
tEncode (sig, t) = LB.chunk (smpEncode $ C.signatureBytes sig) (LB.fromStrict t)
|
||||
tEncode :: SentRawTransmission -> Builder
|
||||
tEncode (sig, t) = lazyByteString $ LB.chunk (smpEncode $ C.signatureBytes sig) (LB.fromStrict t)
|
||||
{-# INLINE tEncode #-}
|
||||
|
||||
tEncodeBatch :: Int -> Builder -> Builder
|
||||
|
||||
@@ -67,7 +67,7 @@ import qualified Data.Aeson.TH as J
|
||||
import Data.Attoparsec.ByteString.Char8 (Parser)
|
||||
import Data.Bifunctor (first)
|
||||
import Data.Bitraversable (bimapM)
|
||||
import Data.ByteString.Builder (Builder, byteString, toLazyByteString)
|
||||
import Simplex.Messaging.Builder (Builder, byteString, toLazyByteString)
|
||||
import Data.ByteString.Char8 (ByteString)
|
||||
import qualified Data.ByteString.Char8 as B
|
||||
import qualified Data.ByteString.Lazy.Char8 as LB
|
||||
|
||||
@@ -508,7 +508,7 @@ testNotificationsSMPRestartBatch n t APNSMockServer {apnsQ} = do
|
||||
conns <- replicateM (n :: Int) $ makeConnection a b
|
||||
_ <- registerTestToken a "abcd" NMInstant apnsQ
|
||||
liftIO $ threadDelay 5000000
|
||||
forM_ (zip [0..] conns) $ \(i, (aliceId, bobId)) -> do
|
||||
forM_ conns $ \(aliceId, bobId) -> do
|
||||
msgId <- sendMessage b aliceId (SMP.MsgFlags True) "hello"
|
||||
get b ##> ("", aliceId, SENT msgId)
|
||||
void $ messageNotification apnsQ
|
||||
|
||||
@@ -4,10 +4,10 @@ module CoreTests.BatchingTests (batchingTests) where
|
||||
|
||||
import Control.Concurrent.STM
|
||||
import Control.Monad
|
||||
import Data.ByteString.Builder (Builder, toLazyByteString)
|
||||
import Data.ByteString.Char8 (ByteString)
|
||||
import qualified Data.ByteString.Lazy.Char8 as LB
|
||||
import qualified Data.List.NonEmpty as L
|
||||
import Simplex.Messaging.Builder (Builder)
|
||||
import qualified Simplex.Messaging.Builder as BB
|
||||
import Simplex.Messaging.Client
|
||||
import qualified Simplex.Messaging.Crypto as C
|
||||
import Simplex.Messaging.Protocol
|
||||
@@ -171,9 +171,7 @@ randomSENDCmd c len = do
|
||||
mkTransmission c (Just rpKey, sId, Cmd SSender $ SEND noMsgFlags msg)
|
||||
|
||||
lenOk :: Builder -> Bool
|
||||
lenOk s = 0 < len && len <= smpBlockSize - 2
|
||||
where
|
||||
len = fromIntegral . LB.length $ toLazyByteString s
|
||||
lenOk s = 0 < BB.length s && BB.length s <= smpBlockSize - 2
|
||||
|
||||
lenOk1 :: TransportBatch r -> Bool
|
||||
lenOk1 = \case
|
||||
|
||||
Reference in New Issue
Block a user