diff --git a/CHANGELOG.md b/CHANGELOG.md index c7af5a10a..048edcb25 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,13 @@ +# 2.2.0 + +SMP server: + +- Fix sockets/threads/memory leak + +SMP agent: + +- Support stopping and resuming agent with `disconnectAgentClient` / `resumeAgentClient` + # 2.1.1 SMP server: diff --git a/package.yaml b/package.yaml index 177d4ad86..4f05628ea 100644 --- a/package.yaml +++ b/package.yaml @@ -1,5 +1,5 @@ name: simplexmq -version: 2.1.1 +version: 2.2.0 synopsis: SimpleXMQ message broker description: | This package includes <./docs/Simplex-Messaging-Server.html server>, diff --git a/simplexmq.cabal b/simplexmq.cabal index c6e50f2ed..99cc897f6 100644 --- a/simplexmq.cabal +++ b/simplexmq.cabal @@ -5,7 +5,7 @@ cabal-version: 1.12 -- see: https://github.com/sol/hpack name: simplexmq -version: 2.1.1 +version: 2.2.0 synopsis: SimpleXMQ message broker description: This package includes <./docs/Simplex-Messaging-Server.html server>, <./docs/Simplex-Messaging-Client.html client> and diff --git a/src/Simplex/Messaging/Transport.hs b/src/Simplex/Messaging/Transport.hs index ac7f34c9a..221db1b65 100644 --- a/src/Simplex/Messaging/Transport.hs +++ b/src/Simplex/Messaging/Transport.hs @@ -96,7 +96,7 @@ supportedSMPVersions :: VersionRange supportedSMPVersions = mkVersionRange 1 2 simplexMQVersion :: String -simplexMQVersion = "2.1.1" +simplexMQVersion = "2.2.0" -- * Transport connection class diff --git a/src/Simplex/Messaging/Transport/Server.hs b/src/Simplex/Messaging/Transport/Server.hs index c2e12aff0..3c1604a2f 100644 --- a/src/Simplex/Messaging/Transport/Server.hs +++ b/src/Simplex/Messaging/Transport/Server.hs @@ -12,20 +12,22 @@ module Simplex.Messaging.Transport.Server ) where +import Control.Concurrent.STM (stateTVar) import Control.Monad.Except import Control.Monad.IO.Unlift import qualified Crypto.Store.X509 as SX import Data.Default (def) -import Data.Set (Set) -import qualified Data.Set as S import qualified Data.X509 as X import Data.X509.Validation (Fingerprint (..)) import qualified Data.X509.Validation as XV import Network.Socket import qualified Network.TLS as T +import Simplex.Messaging.TMap (TMap) +import qualified Simplex.Messaging.TMap as TM import Simplex.Messaging.Transport import Simplex.Messaging.Util (catchAll_) import System.Exit (exitFailure) +import System.Mem.Weak (Weak, deRefWeak) import UnliftIO.Concurrent import qualified UnliftIO.Exception as E import UnliftIO.STM @@ -36,37 +38,29 @@ import UnliftIO.STM runTransportServer :: forall c m. (Transport c, MonadUnliftIO m) => TMVar Bool -> ServiceName -> T.ServerParams -> (c -> m ()) -> m () runTransportServer started port serverParams server = do u <- askUnliftIO - liftIO $ do - clients <- newTVarIO S.empty + liftIO . runTCPServer started port $ \conn -> E.bracket - (startTCPServer started port) - (closeServer started clients) - $ \sock -> forever . E.bracketOnError (accept sock) (close . fst) $ \(conn, _peer) -> do - -- catchAll_ is needed here in case the connection was closed earlier - tid <- forkFinally (connectClient u conn) (const . liftIO $ gracefulClose conn 5000 `catchAll_` pure ()) - atomically . modifyTVar' clients $ S.insert tid - where - connectClient :: UnliftIO m -> Socket -> IO () - connectClient u conn = - E.bracket - (connectTLS serverParams conn >>= getServerConnection) - closeConnection - (unliftIO u . server) + (connectTLS serverParams conn >>= getServerConnection) + closeConnection + (unliftIO u . server) --- | Run TCP server without TLS - only used in SimpleX Chat +-- | Run TCP server without TLS runTCPServer :: TMVar Bool -> ServiceName -> (Socket -> IO ()) -> IO () runTCPServer started port server = do - clients <- newTVarIO S.empty + clients <- atomically TM.empty + clientId <- newTVarIO 0 E.bracket (startTCPServer started port) (closeServer started clients) $ \sock -> forever . E.bracketOnError (accept sock) (close . fst) $ \(conn, _peer) -> do - tid <- forkFinally (server conn) (const $ gracefulClose conn 5000) - atomically . modifyTVar' clients $ S.insert tid + -- catchAll_ is needed here in case the connection was closed earlier + cId <- atomically $ stateTVar clientId $ \cId -> (cId + 1, cId + 1) + tId <- mkWeakThreadId =<< forkFinally (server conn) (const $ gracefulClose conn 5000 `catchAll_` atomically (TM.delete cId clients)) + atomically $ TM.insert cId tId clients -closeServer :: TMVar Bool -> TVar (Set ThreadId) -> Socket -> IO () +closeServer :: TMVar Bool -> TMap Int (Weak ThreadId) -> Socket -> IO () closeServer started clients sock = do - readTVarIO clients >>= mapM_ killThread + readTVarIO clients >>= mapM_ (deRefWeak >=> mapM_ killThread) close sock void . atomically $ tryPutTMVar started False