Merge branch 'master' into f/notifications

This commit is contained in:
Evgeny Poberezkin
2022-06-07 14:17:05 +01:00
5 changed files with 30 additions and 26 deletions

View File

@@ -1,3 +1,13 @@
# 2.2.0
SMP server:
- Fix sockets/threads/memory leak
SMP agent:
- Support stopping and resuming agent with `disconnectAgentClient` / `resumeAgentClient`
# 2.1.1
SMP server:

View File

@@ -1,5 +1,5 @@
name: simplexmq
version: 2.1.1
version: 2.2.0
synopsis: SimpleXMQ message broker
description: |
This package includes <./docs/Simplex-Messaging-Server.html server>,

View File

@@ -5,7 +5,7 @@ cabal-version: 1.12
-- see: https://github.com/sol/hpack
name: simplexmq
version: 2.1.1
version: 2.2.0
synopsis: SimpleXMQ message broker
description: This package includes <./docs/Simplex-Messaging-Server.html server>,
<./docs/Simplex-Messaging-Client.html client> and

View File

@@ -96,7 +96,7 @@ supportedSMPVersions :: VersionRange
supportedSMPVersions = mkVersionRange 1 2
simplexMQVersion :: String
simplexMQVersion = "2.1.1"
simplexMQVersion = "2.2.0"
-- * Transport connection class

View File

@@ -12,20 +12,22 @@ module Simplex.Messaging.Transport.Server
)
where
import Control.Concurrent.STM (stateTVar)
import Control.Monad.Except
import Control.Monad.IO.Unlift
import qualified Crypto.Store.X509 as SX
import Data.Default (def)
import Data.Set (Set)
import qualified Data.Set as S
import qualified Data.X509 as X
import Data.X509.Validation (Fingerprint (..))
import qualified Data.X509.Validation as XV
import Network.Socket
import qualified Network.TLS as T
import Simplex.Messaging.TMap (TMap)
import qualified Simplex.Messaging.TMap as TM
import Simplex.Messaging.Transport
import Simplex.Messaging.Util (catchAll_)
import System.Exit (exitFailure)
import System.Mem.Weak (Weak, deRefWeak)
import UnliftIO.Concurrent
import qualified UnliftIO.Exception as E
import UnliftIO.STM
@@ -36,37 +38,29 @@ import UnliftIO.STM
runTransportServer :: forall c m. (Transport c, MonadUnliftIO m) => TMVar Bool -> ServiceName -> T.ServerParams -> (c -> m ()) -> m ()
runTransportServer started port serverParams server = do
u <- askUnliftIO
liftIO $ do
clients <- newTVarIO S.empty
liftIO . runTCPServer started port $ \conn ->
E.bracket
(startTCPServer started port)
(closeServer started clients)
$ \sock -> forever . E.bracketOnError (accept sock) (close . fst) $ \(conn, _peer) -> do
-- catchAll_ is needed here in case the connection was closed earlier
tid <- forkFinally (connectClient u conn) (const . liftIO $ gracefulClose conn 5000 `catchAll_` pure ())
atomically . modifyTVar' clients $ S.insert tid
where
connectClient :: UnliftIO m -> Socket -> IO ()
connectClient u conn =
E.bracket
(connectTLS serverParams conn >>= getServerConnection)
closeConnection
(unliftIO u . server)
(connectTLS serverParams conn >>= getServerConnection)
closeConnection
(unliftIO u . server)
-- | Run TCP server without TLS - only used in SimpleX Chat
-- | Run TCP server without TLS
runTCPServer :: TMVar Bool -> ServiceName -> (Socket -> IO ()) -> IO ()
runTCPServer started port server = do
clients <- newTVarIO S.empty
clients <- atomically TM.empty
clientId <- newTVarIO 0
E.bracket
(startTCPServer started port)
(closeServer started clients)
$ \sock -> forever . E.bracketOnError (accept sock) (close . fst) $ \(conn, _peer) -> do
tid <- forkFinally (server conn) (const $ gracefulClose conn 5000)
atomically . modifyTVar' clients $ S.insert tid
-- catchAll_ is needed here in case the connection was closed earlier
cId <- atomically $ stateTVar clientId $ \cId -> (cId + 1, cId + 1)
tId <- mkWeakThreadId =<< forkFinally (server conn) (const $ gracefulClose conn 5000 `catchAll_` atomically (TM.delete cId clients))
atomically $ TM.insert cId tId clients
closeServer :: TMVar Bool -> TVar (Set ThreadId) -> Socket -> IO ()
closeServer :: TMVar Bool -> TMap Int (Weak ThreadId) -> Socket -> IO ()
closeServer started clients sock = do
readTVarIO clients >>= mapM_ killThread
readTVarIO clients >>= mapM_ (deRefWeak >=> mapM_ killThread)
close sock
void . atomically $ tryPutTMVar started False