Merge branch 'master' into proxy

This commit is contained in:
Evgeny Poberezkin
2024-05-11 10:25:35 +01:00
7 changed files with 96 additions and 34 deletions
+15
View File
@@ -1,3 +1,18 @@
# 5.7.3
SMP/NTF protocol:
- add ALPN for handshake version negotiation, similar to XFTP (to preserve backwards compatibility with the old clients).
- upgrade clients to versions v7/v2 of the protocols.
SMP server:
- faster responses to subscription requests.
XFTP client:
- fix network exception during file download treated as permanent file error.
SMP agent:
- do not report subscription timeouts while client is offline.
# 5.7.2
SMP agent:
+1 -1
View File
@@ -1,5 +1,5 @@
name: simplexmq
version: 5.7.2.0
version: 5.7.3.1
synopsis: SimpleXMQ message broker
description: |
This package includes <./docs/Simplex-Messaging-Server.html server>,
+1 -1
View File
@@ -5,7 +5,7 @@ cabal-version: 1.12
-- see: https://github.com/sol/hpack
name: simplexmq
version: 5.7.2.0
version: 5.7.3.1
synopsis: SimpleXMQ message broker
description: This package includes <./docs/Simplex-Messaging-Server.html server>,
<./docs/Simplex-Messaging-Client.html client> and
+44 -21
View File
@@ -53,7 +53,7 @@ import Data.Either (fromRight, partitionEithers)
import Data.Functor (($>))
import Data.Int (Int64)
import qualified Data.IntMap.Strict as IM
import Data.List (intercalate)
import Data.List (intercalate, mapAccumR)
import Data.List.NonEmpty (NonEmpty (..))
import qualified Data.List.NonEmpty as L
import qualified Data.Map.Strict as M
@@ -429,7 +429,7 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do
hPutStrLn h "AUTH"
runClientTransport :: Transport c => THandleSMP c 'TServer -> M ()
runClientTransport th@THandle {params = thParams@THandleParams {thVersion, sessionId}} = do
runClientTransport h@THandle {params = thParams@THandleParams {thVersion, sessionId}} = do
q <- asks $ tbqSize . config
ts <- liftIO getSystemTime
active <- asks clients
@@ -440,11 +440,12 @@ runClientTransport th@THandle {params = thParams@THandleParams {thVersion, sessi
pure new
s <- asks server
expCfg <- asks $ inactiveClientExpiration . config
th <- newMVar h -- put TH under a fair lock to interleave messages and command responses
labelMyThread . B.unpack $ "client $" <> encode sessionId
raceAny_ ([liftIO $ send th c, client thParams c s, receive th c] <> disconnectThread_ c expCfg)
raceAny_ ([liftIO $ send th c, liftIO $ sendMsg th c, client thParams c s, receive h c] <> disconnectThread_ c expCfg)
`finally` clientDisconnected c
where
disconnectThread_ c (Just expCfg) = [liftIO $ disconnectTransport th (rcvActiveAt c) (sndActiveAt c) expCfg (noSubscriptions c)]
disconnectThread_ c (Just expCfg) = [liftIO $ disconnectTransport h (rcvActiveAt c) (sndActiveAt c) expCfg (noSubscriptions c)]
disconnectThread_ _ _ = []
noSubscriptions c = atomically $ (&&) <$> TM.null (subscriptions c) <*> TM.null (ntfSubscriptions c)
@@ -477,10 +478,10 @@ cancelSub sub =
_ -> return ()
receive :: Transport c => THandleSMP c 'TServer -> Client -> M ()
receive th@THandle {params = THandleParams {thAuth}} Client {rcvQ, sndQ, rcvActiveAt, sessionId} = do
receive h@THandle {params = THandleParams {thAuth}} Client {rcvQ, sndQ, rcvActiveAt, sessionId} = do
labelMyThread . B.unpack $ "client $" <> encode sessionId <> " receive"
forever $ do
ts <- L.toList <$> liftIO (tGet th)
ts <- L.toList <$> liftIO (tGet h)
atomically . writeTVar rcvActiveAt =<< liftIO getSystemTime
(errs, cmds) <- partitionEithers <$> mapM cmdAction ts
write sndQ errs
@@ -497,20 +498,41 @@ receive th@THandle {params = THandleParams {thAuth}} Client {rcvQ, sndQ, rcvActi
VRFailed -> Left (corrId, entId, ERR AUTH)
write q = mapM_ (atomically . writeTBQueue q) . L.nonEmpty
send :: Transport c => THandleSMP c 'TServer -> Client -> IO ()
send h@THandle {params} Client {sndQ, sessionId, sndActiveAt} = do
send :: Transport c => MVar (THandleSMP c 'TServer) -> Client -> IO ()
send th c@Client {sndQ, msgQ, sessionId} = do
labelMyThread . B.unpack $ "client $" <> encode sessionId <> " send"
forever $ do
ts <- atomically $ L.sortWith tOrder <$> readTBQueue sndQ
-- TODO we can authorize responses as well
void . liftIO . tPut h $ L.map (\t -> Right (Nothing, encodeTransmission params t)) ts
atomically . writeTVar sndActiveAt =<< liftIO getSystemTime
forever $ atomically (readTBQueue sndQ) >>= sendTransmissions
where
tOrder :: Transmission BrokerMsg -> Int
tOrder (_, _, cmd) = case cmd of
MSG {} -> 0
NMSG {} -> 0
_ -> 1
sendTransmissions :: NonEmpty (Transmission BrokerMsg) -> IO ()
sendTransmissions ts
| L.length ts <= 2 = tSend th c ts
| otherwise = do
let (msgs_, ts') = mapAccumR splitMessages [] ts
-- If the request had batched subscriptions (L.length ts > 2)
-- this will reply OK to all SUBs in the first batched transmission,
-- to reduce client timeouts.
tSend th c ts'
-- After that all messages will be sent in separate transmissions,
-- without any client response timeouts, and allowing them to interleave
-- with other requests responses.
mapM_ (atomically . writeTBQueue msgQ) $ L.nonEmpty msgs_
where
splitMessages :: [Transmission BrokerMsg] -> Transmission BrokerMsg -> ([Transmission BrokerMsg], Transmission BrokerMsg)
splitMessages msgs t@(corrId, entId, cmd) = case cmd of
-- replace MSG response with OK, accumulating MSG in a separate list.
MSG {} -> ((CorrId "", entId, cmd) : msgs, (corrId, entId, OK))
_ -> (msgs, t)
sendMsg :: Transport c => MVar (THandleSMP c 'TServer) -> Client -> IO ()
sendMsg th c@Client {msgQ, sessionId} = do
labelMyThread . B.unpack $ "client $" <> encode sessionId <> " sendMsg"
forever $ atomically (readTBQueue msgQ) >>= mapM_ (\t -> tSend th c [t])
tSend :: Transport c => MVar (THandleSMP c 'TServer) -> Client -> NonEmpty (Transmission BrokerMsg) -> IO ()
tSend th Client {sndActiveAt} ts = do
withMVar th $ \h@THandle {params} ->
void . tPut h $ L.map (\t -> Right (Nothing, encodeTransmission params t)) ts
atomically . writeTVar sndActiveAt =<< liftIO getSystemTime
disconnectTransport :: Transport c => THandle v c 'TServer -> TVar SystemTime -> TVar SystemTime -> ExpirationConfig -> IO Bool -> IO ()
disconnectTransport THandle {connection, params = THandleParams {sessionId}} rcvActiveAt sndActiveAt expCfg noSubscriptions = do
@@ -1085,9 +1107,10 @@ saveServerMessages keepMsgs = asks (storeMsgsFile . config) >>= mapM_ saveMessag
>>= mapM_ (B.hPutStrLn h . strEncode . MLRv3 rId)
restoreServerMessages :: M Int
restoreServerMessages = asks (storeMsgsFile . config) >>= \case
Just f -> ifM (doesFileExist f) (restoreMessages f) (pure 0)
Nothing -> pure 0
restoreServerMessages =
asks (storeMsgsFile . config) >>= \case
Just f -> ifM (doesFileExist f) (restoreMessages f) (pure 0)
Nothing -> pure 0
where
restoreMessages f = do
logInfo $ "restoring messages from file " <> T.pack f
+3 -1
View File
@@ -138,6 +138,7 @@ data Client = Client
ntfSubscriptions :: TMap NotifierId (),
rcvQ :: TBQueue (NonEmpty (Maybe QueueRec, Transmission Cmd)),
sndQ :: TBQueue (NonEmpty (Transmission BrokerMsg)),
msgQ :: TBQueue (NonEmpty (Transmission BrokerMsg)),
endThreads :: TVar (IntMap (Weak ThreadId)),
endThreadSeq :: TVar Int,
thVersion :: VersionSMP,
@@ -172,13 +173,14 @@ newClient nextClientId qSize thVersion sessionId createdAt = do
ntfSubscriptions <- TM.empty
rcvQ <- newTBQueue qSize
sndQ <- newTBQueue qSize
msgQ <- newTBQueue qSize
endThreads <- newTVar IM.empty
endThreadSeq <- newTVar 0
connected <- newTVar True
rcvActiveAt <- newTVar createdAt
sndActiveAt <- newTVar createdAt
proxyClient_ <- newTVar Nothing
return Client {clientId, subscriptions, ntfSubscriptions, rcvQ, sndQ, endThreads, endThreadSeq, thVersion, sessionId, connected, createdAt, rcvActiveAt, sndActiveAt, proxyClient_}
return Client {clientId, subscriptions, ntfSubscriptions, rcvQ, sndQ, msgQ, endThreads, endThreadSeq, thVersion, sessionId, connected, createdAt, rcvActiveAt, sndActiveAt, proxyClient_}
newSubscription :: SubscriptionThread -> STM Sub
newSubscription subThread = do
+7 -7
View File
@@ -27,9 +27,9 @@ import GHC.Stack (withFrozenCallStack)
import Network.HTTP.Types (urlEncode)
import SMPAgentClient
import SMPClient (testKeyHash, testPort, testPort2, testStoreLogFile, withSmpServer, withSmpServerStoreLogOn)
import Simplex.Messaging.Agent.Protocol hiding (MID, CONF, INFO, REQ, SENT)
import Simplex.Messaging.Agent.Protocol hiding (CONF, INFO, MID, REQ, SENT)
import qualified Simplex.Messaging.Agent.Protocol as A
import Simplex.Messaging.Crypto.Ratchet (InitialKeys (..), PQEncryption (..), PQSupport (..), pattern IKPQOn, pattern IKPQOff, pattern PQEncOn, pattern PQSupportOn, pattern PQSupportOff)
import Simplex.Messaging.Crypto.Ratchet (InitialKeys (..), PQEncryption (..), PQSupport (..), pattern IKPQOff, pattern IKPQOn, pattern PQEncOn, pattern PQSupportOff, pattern PQSupportOn)
import qualified Simplex.Messaging.Crypto.Ratchet as CR
import Simplex.Messaging.Encoding.String
import Simplex.Messaging.Protocol (ErrorType (..))
@@ -547,10 +547,10 @@ testResumeDeliveryQuotaExceeded _ alice bob = do
bob <#= \case ("", "alice", Msg "message 4") -> True; _ -> False
bob #: ("4", "alice", "ACK 7") #> ("4", "alice", OK)
inAnyOrder
(tGetAgent alice)
[ \case ("", c, Right (SENT 8)) -> c == "bob"; _ -> False,
\case ("", c, Right QCONT) -> c == "bob"; _ -> False
]
(tGetAgent alice)
[ \case ("", c, Right (SENT 8)) -> c == "bob"; _ -> False,
\case ("", c, Right QCONT) -> c == "bob"; _ -> False
]
bob <#= \case ("", "alice", Msg "over quota") -> True; _ -> False
-- message 8 is skipped because of alice agent sending "QCONT" message
bob #: ("5", "alice", "ACK 9") #> ("5", "alice", OK)
@@ -580,7 +580,7 @@ enableKEMStr _ = ""
pqConnModeStr :: InitialKeys -> ByteString
pqConnModeStr (IKNoPQ PQSupportOff) = ""
pqConnModeStr pq = " " <> strEncode pq
pqConnModeStr pq = " " <> strEncode pq
sendMessage :: Transport c => (c, ConnId) -> (c, ConnId) -> ByteString -> IO ()
sendMessage (h1, name1) (h2, name2) msg = do
+25 -3
View File
@@ -59,10 +59,10 @@ import Data.ByteString.Char8 (ByteString)
import qualified Data.ByteString.Char8 as B
import Data.Either (isRight)
import Data.Int (Int64)
import Data.List (nub)
import Data.List (find, nub)
import Data.List.NonEmpty (NonEmpty)
import qualified Data.Map as M
import Data.Maybe (isNothing)
import Data.Maybe (isJust, isNothing)
import qualified Data.Set as S
import Data.Time.Clock (diffUTCTime, getCurrentTime)
import Data.Time.Clock.System (SystemTime (..), getSystemTime)
@@ -341,6 +341,9 @@ functionalAPITests t = do
skip "faster version of the previous test (200 subscriptions gets very slow with test coverage)" $
it "should subscribe to multiple (6) subscriptions with batching" $
testBatchedSubscriptions 6 3 t
it "should subscribe to multiple connections with pending messages" $
withSmpServer t $
testBatchedPendingMessages 10 5
describe "Async agent commands" $ do
it "should connect using async agent commands" $
withSmpServer t testAsyncCommands
@@ -1546,7 +1549,7 @@ testBatchedSubscriptions :: Int -> Int -> ATransport -> IO ()
testBatchedSubscriptions nCreate nDel t =
withAgentClientsCfgServers2 agentCfg agentCfg initAgentServers2 $ \a b -> do
conns <- runServers $ do
conns <- replicateM (nCreate :: Int) $ makeConnection_ PQSupportOff a b
conns <- replicateM nCreate $ makeConnection_ PQSupportOff a b
forM_ conns $ \(aId, bId) -> exchangeGreetings_ PQEncOff a bId b aId
let (aIds', bIds') = unzip $ take nDel conns
delete a bIds'
@@ -1605,6 +1608,25 @@ testBatchedSubscriptions nCreate nDel t =
killThread t1
pure res
testBatchedPendingMessages :: Int -> Int -> IO ()
testBatchedPendingMessages nCreate nMsgs =
withA $ \a -> do
conns <- withB $ \b -> runRight $ do
replicateM nCreate $ makeConnection a b
let msgConns = take nMsgs conns
runRight_ $ forM_ msgConns $ \(_, bId) -> sendMessage a bId SMP.noMsgFlags "hello"
replicateM_ nMsgs $ get a =##> \case ("", cId, SENT _) -> isJust $ find ((cId ==) . snd) msgConns; _ -> False
withB $ \b -> runRight_ $ do
r <- subscribeConnections b $ map fst conns
liftIO $ all isRight r `shouldBe` True
replicateM_ nMsgs $ do
("", cId, Msg' msgId _ "hello") <- get b
liftIO $ isJust (find ((cId ==) . fst) msgConns) `shouldBe` True
ackMessage b cId msgId Nothing
where
withA = withAgent 1 agentCfg initAgentServers testDB
withB = withAgent 2 agentCfg initAgentServers testDB2
testAsyncCommands :: IO ()
testAsyncCommands =
withAgentClients2 $ \alice bob -> runRight_ $ do