Merge branch 'stable'

This commit is contained in:
Evgeny Poberezkin
2021-12-17 15:44:58 +00:00
11 changed files with 79 additions and 23 deletions
+1
View File
@@ -4,6 +4,7 @@ on:
push:
branches:
- master
- stable
tags:
- "v*"
pull_request:
+4
View File
@@ -1,3 +1,7 @@
# 0.5.1
- Fix server subscription logic bug that was leading to memory leak / resource exhaustion in some edge cases.
# 0.5.0
- No changes in SMP server implementation - it is backwards compatible with v0.4.1
+8
View File
@@ -3,6 +3,14 @@
[![GitHub build](https://github.com/simplex-chat/simplexmq/workflows/build/badge.svg)](https://github.com/simplex-chat/simplexmq/actions?query=workflow%3Abuild)
[![GitHub release](https://img.shields.io/github/v/release/simplex-chat/simplexmq)](https://github.com/simplex-chat/simplexmq/releases)
📢 **v0.5.1 brings a hotfix to the server's subscription management logic, to apply it log in to your server via SSH and run the following command. If you have store log enabled for your server, information about already established queues will be preserved.** If you're doing a custom installation instead of Linode or DigitalOcean you may have to change the path for binary download.
```sh
systemctl stop smp-server
curl -L -o /opt/simplex/bin/smp-server https://github.com/simplex-chat/simplexmq/releases/download/v0.5.1/smp-server-ubuntu-20_04-x86-64
systemctl start smp-server
```
## Message broker for unidirectional (simplex) queues
SimpleXMQ is a message broker for managing message queues and sending messages over public network. It consists of SMP server, SMP client library and SMP agent that implement [SMP protocol](https://github.com/simplex-chat/simplexmq/blob/master/protocol/simplex-messaging.md) for client-server communication and [SMP agent protocol](https://github.com/simplex-chat/simplexmq/blob/master/protocol/agent-protocol.md) to manage duplex connections via simplex queues on multiple SMP servers.
+1
View File
@@ -42,6 +42,7 @@ serverConfig :: ServerConfig
serverConfig =
ServerConfig
{ tbqSize = 16,
serverTbqSize = 128,
msgQueueQuota = 256,
queueIdBytes = 24,
msgIdBytes = 24, -- must be at least 24 bytes, it is used as 192-bit nonce for XSalsa20
+1 -1
View File
@@ -1,5 +1,5 @@
name: simplexmq
version: 0.5.0
version: 0.5.1
synopsis: SimpleXMQ message broker
description: |
This package includes <./docs/Simplex-Messaging-Server.html server>,
+1 -1
View File
@@ -5,7 +5,7 @@ cabal-version: 1.12
-- see: https://github.com/sol/hpack
name: simplexmq
version: 0.5.0
version: 0.5.1
synopsis: SimpleXMQ message broker
description: This package includes <./docs/Simplex-Messaging-Server.html server>,
<./docs/Simplex-Messaging-Client.html client> and
+33 -13
View File
@@ -96,18 +96,26 @@ runSMPServerBlocking started cfg@ServerConfig {transports} = do
(s -> m' ()) ->
m' ()
serverThread s subQ subs clientSubs unsub = forever $ do
atomically updateSubscribers >>= mapM_ unsub
atomically updateSubscribers
>>= fmap join . mapM endPreviousSubscriptions
>>= mapM_ unsub
where
updateSubscribers :: STM (Maybe s)
updateSubscribers :: STM (Maybe (QueueId, Client))
updateSubscribers = do
(qId, clnt) <- readTBQueue $ subQ s
serverSubs <- readTVar $ subs s
writeTVar (subs s) $ M.insert qId clnt serverSubs
join <$> mapM (endPreviousSubscriptions qId) (M.lookup qId serverSubs)
endPreviousSubscriptions :: QueueId -> Client -> STM (Maybe s)
endPreviousSubscriptions qId c = do
writeTBQueue (sndQ c) (CorrId "", qId, END)
stateTVar (clientSubs c) $ \ss -> (M.lookup qId ss, M.delete qId ss)
let clientToBeNotified = \c' ->
if sameClientSession clnt c'
then pure Nothing
else do
yes <- readTVar $ connected c'
pure $ if yes then Just (qId, c') else Nothing
stateTVar (subs s) (\cs -> (M.lookup qId cs, M.insert qId clnt cs))
>>= fmap join . mapM clientToBeNotified
endPreviousSubscriptions :: (QueueId, Client) -> m' (Maybe s)
endPreviousSubscriptions (qId, c) = do
void . forkIO . atomically $
writeTBQueue (sndQ c) (CorrId "", qId, END)
atomically . stateTVar (clientSubs c) $ \ss -> (M.lookup qId ss, M.delete qId ss)
runClient :: (Transport c, MonadUnliftIO m, MonadReader Env m) => TProxy c -> c -> m ()
runClient _ h = do
@@ -123,11 +131,23 @@ runClientTransport th@THandle {sessionId} = do
c <- atomically $ newClient q sessionId
s <- asks server
raceAny_ [send th c, client c s, receive th c]
`finally` cancelSubscribers c
`finally` clientDisconnected c
cancelSubscribers :: MonadUnliftIO m => Client -> m ()
cancelSubscribers Client {subscriptions} =
readTVarIO subscriptions >>= mapM_ cancelSub
clientDisconnected :: (MonadUnliftIO m, MonadReader Env m) => Client -> m ()
clientDisconnected c@Client {subscriptions, connected} = do
atomically $ writeTVar connected False
subs <- readTVarIO subscriptions
mapM_ cancelSub subs
cs <- asks $ subscribers . server
atomically . mapM_ (modifyTVar cs . M.update deleteCurrentClient) $ M.keys subs
where
deleteCurrentClient :: Client -> Maybe Client
deleteCurrentClient c'
| sameClientSession c c' = Nothing
| otherwise = Just c'
sameClientSession :: Client -> Client -> Bool
sameClientSession Client {sessionId = s} Client {sessionId = s'} = False -- TODO replace with s == s'
cancelSub :: MonadUnliftIO m => Sub -> m ()
cancelSub = \case
+8 -5
View File
@@ -25,11 +25,12 @@ import System.IO (IOMode (..))
import UnliftIO.STM
data ServerConfig = ServerConfig
{ tbqSize :: Natural,
{ transports :: [(ServiceName, ATransport)],
tbqSize :: Natural,
serverTbqSize :: Natural,
msgQueueQuota :: Natural,
queueIdBytes :: Int,
msgIdBytes :: Int,
transports :: [(ServiceName, ATransport)],
storeLog :: Maybe (StoreLog 'ReadMode),
blockSize :: Int,
serverPrivateKey :: C.PrivateKey 'C.RSA, -- TODO delete
@@ -60,7 +61,8 @@ data Client = Client
ntfSubscriptions :: TVar (Map NotifierId ()),
rcvQ :: TBQueue (Transmission ClientCmd),
sndQ :: TBQueue BrokerTransmission,
sessionId :: ByteString
sessionId :: ByteString,
connected :: TVar Bool
}
data SubscriptionThread = NoSub | SubPending | SubThread ThreadId
@@ -84,7 +86,8 @@ newClient qSize sessionId = do
ntfSubscriptions <- newTVar M.empty
rcvQ <- newTBQueue qSize
sndQ <- newTBQueue qSize
return Client {subscriptions, ntfSubscriptions, rcvQ, sndQ, sessionId}
connected <- newTVar True
return Client {subscriptions, ntfSubscriptions, rcvQ, sndQ, sessionId, connected}
newSubscription :: STM Sub
newSubscription = do
@@ -93,7 +96,7 @@ newSubscription = do
newEnv :: forall m. (MonadUnliftIO m, MonadRandom m) => ServerConfig -> m Env
newEnv config = do
server <- atomically $ newServer (tbqSize config)
server <- atomically $ newServer (serverTbqSize config)
queueStore <- atomically newQueueStore
msgStore <- atomically newMsgStore
idsDrg <- drgNew >>= newTVarIO
+1 -1
View File
@@ -320,7 +320,7 @@ major :: SMPVersion -> (Int, Int)
major (SMPVersion a b _ _) = (a, b)
currentSMPVersion :: SMPVersion
currentSMPVersion = "0.5.0.0"
currentSMPVersion = "0.5.1.0"
currentSMPVersionStr :: ByteString
currentSMPVersionStr = serializeSMPVersion currentSMPVersion
+1
View File
@@ -61,6 +61,7 @@ cfg =
ServerConfig
{ transports = undefined,
tbqSize = 1,
serverTbqSize = 1,
msgQueueQuota = 4,
queueIdBytes = 24,
msgIdBytes = 24,
+20 -2
View File
@@ -11,7 +11,7 @@ module ServerTests where
import Control.Concurrent (ThreadId, killThread)
import Control.Concurrent.STM
import Control.Exception (SomeException, try)
import Control.Monad.Except (forM_, runExceptT)
import Control.Monad.Except (forM, forM_, runExceptT)
import Data.ByteString.Base64
import Data.ByteString.Char8 (ByteString)
import qualified Data.ByteString.Char8 as B
@@ -34,9 +34,10 @@ serverTests t = do
describe "SMP queues" $ do
describe "NEW and KEY commands, SEND messages" $ testCreateSecure t
describe "NEW, OFF and DEL commands, SEND messages" $ testCreateDelete t
describe "Stress test" $ stressTest t
describe "SMP messages" $ do
describe "duplex communication over 2 SMP connections" $ testDuplex t
describe "switch subscription to another SMP queue" $ testSwitchSub t
describe "switch subscription to another TCP connection" $ testSwitchSub t
describe "Store log" $ testWithStoreLog t
describe "Timing of AUTH error" $ testTiming t
describe "Message notifications" $ testMessageNotifications t
@@ -188,6 +189,23 @@ testCreateDelete (ATransport t) =
Resp "cdab" _ err10 <- signSendRecv rh rKey ("cdab", rId, "SUB")
(err10, ERR AUTH) #== "rejects SUB when deleted"
stressTest :: ATransport -> Spec
stressTest (ATransport t) =
it "should create many queues, disconnect and re-connect" $
smpTest3 t $ \h1 h2 h3 -> do
(rPub, rKey) <- C.generateSignatureKeyPair 0 C.SEd25519
(dhPub, _ :: C.PrivateKey 'C.X25519) <- C.generateKeyPair' 0
rIds <- forM [1 .. 50 :: Int] . const $ do
Resp "" "" (Ids rId _ _) <- signSendRecv h1 rKey ("", "", B.unwords ["NEW", C.serializeKey rPub, C.serializeKey dhPub])
pure rId
let subscribeQueues h = forM_ rIds $ \rId -> do
Resp "" rId' OK <- signSendRecv h rKey ("", rId, "SUB")
rId' `shouldBe` rId
closeConnection $ connection h1
subscribeQueues h2
closeConnection $ connection h2
subscribeQueues h3
testDuplex :: ATransport -> Spec
testDuplex (ATransport t) =
it "should create 2 simplex connections and exchange messages" $