mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-03-30 14:16:00 +00:00
remove client from servers subscribers map after client disconnection (#228)
Co-authored-by: Efim Poberezkin <8711996+efim-poberezkin@users.noreply.github.com>
This commit is contained in:
committed by
GitHub
parent
f15067cf68
commit
5f7fe8b0dc
1
.github/workflows/build.yml
vendored
1
.github/workflows/build.yml
vendored
@@ -4,6 +4,7 @@ on:
|
||||
push:
|
||||
branches:
|
||||
- master
|
||||
- stable
|
||||
tags:
|
||||
- "v*"
|
||||
pull_request:
|
||||
|
||||
@@ -3,6 +3,12 @@
|
||||
[](https://github.com/simplex-chat/simplexmq/actions?query=workflow%3Abuild)
|
||||
[](https://github.com/simplex-chat/simplexmq/releases)
|
||||
|
||||
📢 **v0.5.1 brings a hotfix to the server's subscription management logic, to apply it log in to your server via SSH and run the following command. If you have store log enabled for your server, information about already established queues will be preserved.** If you're doing a custom installation instead of Linode or DigitalOcean you may have to change the path for binary download.
|
||||
|
||||
```sh
|
||||
systemctl stop smp-server && curl -L -o /opt/simplex/bin/smp-server https://github.com/simplex-chat/simplexmq/releases/download/v0.5.1/smp-server-ubuntu-20_04-x86-64 && chmod +x /opt/simplex/bin/smp-server && systemctl start smp-server
|
||||
```
|
||||
|
||||
## Message broker for unidirectional (simplex) queues
|
||||
|
||||
SimpleXMQ is a message broker for managing message queues and sending messages over public network. It consists of SMP server, SMP client library and SMP agent that implement [SMP protocol](https://github.com/simplex-chat/simplexmq/blob/master/protocol/simplex-messaging.md) for client-server communication and [SMP agent protocol](https://github.com/simplex-chat/simplexmq/blob/master/protocol/agent-protocol.md) to manage duplex connections via simplex queues on multiple SMP servers.
|
||||
|
||||
@@ -41,6 +41,7 @@ serverConfig :: ServerConfig
|
||||
serverConfig =
|
||||
ServerConfig
|
||||
{ tbqSize = 16,
|
||||
serverTbqSize = 128,
|
||||
msgQueueQuota = 256,
|
||||
queueIdBytes = 12,
|
||||
msgIdBytes = 6,
|
||||
|
||||
@@ -79,13 +79,25 @@ runSMPServerBlocking started cfg@ServerConfig {transports} = do
|
||||
runServer (tcpPort, ATransport t) = runTransportServer started tcpPort (runClient t)
|
||||
|
||||
serverThread :: MonadUnliftIO m' => Server -> m' ()
|
||||
serverThread Server {subscribedQ, subscribers} = forever . atomically $ do
|
||||
(rId, clnt) <- readTBQueue subscribedQ
|
||||
cs <- readTVar subscribers
|
||||
case M.lookup rId cs of
|
||||
Just Client {rcvQ} -> writeTBQueue rcvQ (CorrId B.empty, rId, Cmd SBroker END)
|
||||
Nothing -> return ()
|
||||
writeTVar subscribers $ M.insert rId clnt cs
|
||||
serverThread Server {subscribedQ, subscribers} = forever $ do
|
||||
atomically updateSubscribers >>= \case
|
||||
Just (rId, Client {rcvQ}) ->
|
||||
void . forkIO . atomically $
|
||||
writeTBQueue rcvQ (CorrId "", rId, Cmd SBroker END)
|
||||
_ -> pure ()
|
||||
where
|
||||
updateSubscribers :: STM (Maybe (RecipientId, Client))
|
||||
updateSubscribers = do
|
||||
(rId, c) <- readTBQueue subscribedQ
|
||||
stateTVar subscribers (\cs -> (M.lookup rId cs, M.insert rId c cs)) >>= \case
|
||||
Just c' -> clientToBeNotified rId c c'
|
||||
_ -> pure Nothing
|
||||
clientToBeNotified :: RecipientId -> Client -> Client -> STM (Maybe (RecipientId, Client))
|
||||
clientToBeNotified rId c c'@Client {connected}
|
||||
| clientId c /= clientId c' = do
|
||||
yes <- readTVar connected
|
||||
pure $ if yes then Just (rId, c') else Nothing
|
||||
| otherwise = pure Nothing
|
||||
|
||||
runClient :: (Transport c, MonadUnliftIO m, MonadReader Env m) => TProxy c -> c -> m ()
|
||||
runClient _ h = do
|
||||
@@ -98,14 +110,23 @@ runClient _ h = do
|
||||
runClientTransport :: (Transport c, MonadUnliftIO m, MonadReader Env m) => THandle c -> m ()
|
||||
runClientTransport th = do
|
||||
q <- asks $ tbqSize . config
|
||||
c <- atomically $ newClient q
|
||||
s <- asks server
|
||||
c <- atomically $ newClient s q
|
||||
raceAny_ [send th c, client c s, receive th c]
|
||||
`finally` cancelSubscribers c
|
||||
`finally` clientDisconnected c
|
||||
|
||||
cancelSubscribers :: MonadUnliftIO m => Client -> m ()
|
||||
cancelSubscribers Client {subscriptions} =
|
||||
readTVarIO subscriptions >>= mapM_ cancelSub
|
||||
clientDisconnected :: (MonadUnliftIO m, MonadReader Env m) => Client -> m ()
|
||||
clientDisconnected c@Client {subscriptions, connected} = do
|
||||
atomically $ writeTVar connected False
|
||||
subs <- readTVarIO subscriptions
|
||||
mapM_ cancelSub subs
|
||||
cs <- asks $ subscribers . server
|
||||
atomically . mapM_ (modifyTVar cs . M.update deleteCurrentClient) $ M.keys subs
|
||||
where
|
||||
deleteCurrentClient :: Client -> Maybe Client
|
||||
deleteCurrentClient c'
|
||||
| clientId c == clientId c' = Nothing
|
||||
| otherwise = Just c'
|
||||
|
||||
cancelSub :: MonadUnliftIO m => Sub -> m ()
|
||||
cancelSub = \case
|
||||
@@ -326,7 +347,7 @@ client clnt@Client {subscriptions, rcvQ, sndQ} Server {subscribedQ} =
|
||||
subscriber :: MsgQueue -> m ()
|
||||
subscriber q = atomically $ do
|
||||
msg <- peekMsg q
|
||||
writeTBQueue sndQ $ mkResp (CorrId B.empty) rId (msgCmd msg)
|
||||
writeTBQueue sndQ $ mkResp (CorrId "") rId (msgCmd msg)
|
||||
setSub (\s -> s {subThread = NoSub})
|
||||
void setDelivered
|
||||
|
||||
|
||||
@@ -6,6 +6,7 @@
|
||||
module Simplex.Messaging.Server.Env.STM where
|
||||
|
||||
import Control.Concurrent (ThreadId)
|
||||
import Control.Concurrent.STM (stateTVar)
|
||||
import Control.Monad.IO.Unlift
|
||||
import Crypto.Random
|
||||
import Data.Map.Strict (Map)
|
||||
@@ -25,6 +26,7 @@ import UnliftIO.STM
|
||||
data ServerConfig = ServerConfig
|
||||
{ transports :: [(ServiceName, ATransport)],
|
||||
tbqSize :: Natural,
|
||||
serverTbqSize :: Natural,
|
||||
msgQueueQuota :: Natural,
|
||||
queueIdBytes :: Int,
|
||||
msgIdBytes :: Int,
|
||||
@@ -46,13 +48,16 @@ data Env = Env
|
||||
|
||||
data Server = Server
|
||||
{ subscribedQ :: TBQueue (RecipientId, Client),
|
||||
subscribers :: TVar (Map RecipientId Client)
|
||||
subscribers :: TVar (Map RecipientId Client),
|
||||
nextClientId :: TVar Natural
|
||||
}
|
||||
|
||||
data Client = Client
|
||||
{ subscriptions :: TVar (Map RecipientId Sub),
|
||||
rcvQ :: TBQueue Transmission,
|
||||
sndQ :: TBQueue Transmission
|
||||
sndQ :: TBQueue Transmission,
|
||||
clientId :: Natural,
|
||||
connected :: TVar Bool
|
||||
}
|
||||
|
||||
data SubscriptionThread = NoSub | SubPending | SubThread ThreadId
|
||||
@@ -66,14 +71,17 @@ newServer :: Natural -> STM Server
|
||||
newServer qSize = do
|
||||
subscribedQ <- newTBQueue qSize
|
||||
subscribers <- newTVar M.empty
|
||||
return Server {subscribedQ, subscribers}
|
||||
nextClientId <- newTVar 0
|
||||
return Server {subscribedQ, subscribers, nextClientId}
|
||||
|
||||
newClient :: Natural -> STM Client
|
||||
newClient qSize = do
|
||||
newClient :: Server -> Natural -> STM Client
|
||||
newClient Server {nextClientId} qSize = do
|
||||
subscriptions <- newTVar M.empty
|
||||
rcvQ <- newTBQueue qSize
|
||||
sndQ <- newTBQueue qSize
|
||||
return Client {subscriptions, rcvQ, sndQ}
|
||||
clientId <- stateTVar nextClientId $ \i -> (i, i + 1)
|
||||
connected <- newTVar True
|
||||
return Client {subscriptions, rcvQ, sndQ, clientId, connected}
|
||||
|
||||
newSubscription :: STM Sub
|
||||
newSubscription = do
|
||||
@@ -82,7 +90,7 @@ newSubscription = do
|
||||
|
||||
newEnv :: forall m. (MonadUnliftIO m, MonadRandom m) => ServerConfig -> m Env
|
||||
newEnv config = do
|
||||
server <- atomically $ newServer (tbqSize config)
|
||||
server <- atomically $ newServer (serverTbqSize config)
|
||||
queueStore <- atomically newQueueStore
|
||||
msgStore <- atomically newMsgStore
|
||||
idsDrg <- drgNew >>= newTVarIO
|
||||
|
||||
@@ -60,6 +60,7 @@ cfg =
|
||||
ServerConfig
|
||||
{ transports = undefined,
|
||||
tbqSize = 1,
|
||||
serverTbqSize = 1,
|
||||
msgQueueQuota = 4,
|
||||
queueIdBytes = 12,
|
||||
msgIdBytes = 6,
|
||||
|
||||
@@ -11,7 +11,7 @@ module ServerTests where
|
||||
import Control.Concurrent (ThreadId, killThread)
|
||||
import Control.Concurrent.STM
|
||||
import Control.Exception (SomeException, try)
|
||||
import Control.Monad.Except (forM_, runExceptT)
|
||||
import Control.Monad.Except (forM, forM_, runExceptT)
|
||||
import Data.ByteString.Base64
|
||||
import Data.ByteString.Char8 (ByteString)
|
||||
import qualified Data.ByteString.Char8 as B
|
||||
@@ -34,6 +34,7 @@ serverTests t = do
|
||||
describe "SMP queues" do
|
||||
describe "NEW and KEY commands, SEND messages" $ testCreateSecure t
|
||||
describe "NEW, OFF and DEL commands, SEND messages" $ testCreateDelete t
|
||||
describe "Stress test" $ stressTest t
|
||||
describe "SMP messages" do
|
||||
describe "duplex communication over 2 SMP connections" $ testDuplex t
|
||||
describe "switch subscription to another SMP queue" $ testSwitchSub t
|
||||
@@ -179,6 +180,22 @@ testCreateDelete (ATransport t) =
|
||||
Resp "cdab" _ err10 <- signSendRecv rh rKey ("cdab", rId, "SUB")
|
||||
(err10, ERR AUTH) #== "rejects SUB when deleted"
|
||||
|
||||
stressTest :: ATransport -> Spec
|
||||
stressTest (ATransport t) =
|
||||
it "should create many queues, disconnect and re-connect" $
|
||||
smpTest3 t $ \h1 h2 h3 -> do
|
||||
(rPub, rKey) <- C.generateKeyPair rsaKeySize
|
||||
rIds <- forM [1 .. 50 :: Int] . const $ do
|
||||
Resp "" "" (IDS rId _) <- signSendRecv h1 rKey ("", "", "NEW " <> C.serializePubKey rPub)
|
||||
pure rId
|
||||
let subscribeQueues h = forM_ rIds $ \rId -> do
|
||||
Resp "" rId' OK <- signSendRecv h rKey ("", rId, "SUB")
|
||||
rId' `shouldBe` rId
|
||||
closeConnection $ connection h1
|
||||
subscribeQueues h2
|
||||
closeConnection $ connection h2
|
||||
subscribeQueues h3
|
||||
|
||||
testDuplex :: ATransport -> Spec
|
||||
testDuplex (ATransport t) =
|
||||
it "should create 2 simplex connections and exchange messages" $
|
||||
|
||||
Reference in New Issue
Block a user