diff --git a/package.yaml b/package.yaml index d4ed1c127..04d8c773d 100644 --- a/package.yaml +++ b/package.yaml @@ -1,5 +1,5 @@ name: simplexmq -version: 4.3.0 +version: 4.3.1 synopsis: SimpleXMQ message broker description: | This package includes <./docs/Simplex-Messaging-Server.html server>, diff --git a/simplexmq.cabal b/simplexmq.cabal index 7f1f5929d..f9998b7ac 100644 --- a/simplexmq.cabal +++ b/simplexmq.cabal @@ -5,7 +5,7 @@ cabal-version: 1.12 -- see: https://github.com/sol/hpack name: simplexmq -version: 4.3.0 +version: 4.3.1 synopsis: SimpleXMQ message broker description: This package includes <./docs/Simplex-Messaging-Server.html server>, <./docs/Simplex-Messaging-Client.html client> and @@ -57,6 +57,7 @@ library Simplex.Messaging.Agent.Store.SQLite.Migrations.M20220915_connection_queues Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230110_users Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230117_fkey_indexes + Simplex.Messaging.Agent.TAsyncs Simplex.Messaging.Agent.TRcvQueues Simplex.Messaging.Client Simplex.Messaging.Client.Agent diff --git a/src/Simplex/Messaging/Agent/Client.hs b/src/Simplex/Messaging/Agent/Client.hs index 641b7e86e..cffe673e8 100644 --- a/src/Simplex/Messaging/Agent/Client.hs +++ b/src/Simplex/Messaging/Agent/Client.hs @@ -96,7 +96,7 @@ import Data.Bifunctor (bimap, first, second) import Data.ByteString.Base64 import Data.ByteString.Char8 (ByteString) import qualified Data.ByteString.Char8 as B -import Data.Either (partitionEithers) +import Data.Either (lefts, partitionEithers) import Data.Functor (($>)) import Data.List (foldl', partition) import Data.List.NonEmpty (NonEmpty (..), (<|)) @@ -117,6 +117,7 @@ import Simplex.Messaging.Agent.Protocol import Simplex.Messaging.Agent.RetryInterval import Simplex.Messaging.Agent.Store import Simplex.Messaging.Agent.Store.SQLite (SQLiteStore (..), withTransaction) +import Simplex.Messaging.Agent.TAsyncs import Simplex.Messaging.Agent.TRcvQueues (TRcvQueues) import qualified Simplex.Messaging.Agent.TRcvQueues as RQ import Simplex.Messaging.Client @@ -157,7 +158,7 @@ import Simplex.Messaging.Transport.Client (TransportHost) import Simplex.Messaging.Util import Simplex.Messaging.Version import System.Timeout (timeout) -import UnliftIO (async, mapConcurrently) +import UnliftIO (mapConcurrently) import qualified UnliftIO.Exception as E import UnliftIO.STM @@ -201,8 +202,8 @@ data AgentClient = AgentClient connLocks :: TMap ConnId Lock, -- locks to prevent concurrent reconnections to SMP servers reconnectLocks :: TMap SMPTransportSession Lock, - reconnections :: TVar [Async ()], - asyncClients :: TVar [Async ()], + reconnections :: TAsyncs, + asyncClients :: TAsyncs, agentStats :: TMap AgentStatsKey (TVar Int), clientId :: Int, agentEnv :: Env @@ -271,8 +272,8 @@ newAgentClient InitialAgentServers {smp, ntf, netCfg} agentEnv = do getMsgLocks <- TM.empty connLocks <- TM.empty reconnectLocks <- TM.empty - reconnections <- newTVar [] - asyncClients <- newTVar [] + reconnections <- newTAsyncs + asyncClients <- newTAsyncs agentStats <- TM.empty clientId <- stateTVar (clientCounter agentEnv) $ \i -> let i' = i + 1 in (i', i') return AgentClient {active, rcvQ, subQ, msgQ, smpServers, smpClients, ntfServers, ntfClients, useNetworkConfig, subscrConns, activeSubs, pendingSubs, pendingMsgsQueued, smpQueueMsgQueues, smpQueueMsgDeliveries, connCmdsQueued, asyncCmdQueues, asyncCmdProcesses, ntfNetworkOp, rcvNetworkOp, msgDeliveryOp, sndNetworkOp, databaseOp, agentState, getMsgLocks, connLocks, reconnectLocks, reconnections, asyncClients, agentStats, clientId, agentEnv} @@ -297,7 +298,7 @@ getSMPServerClient c@AgentClient {active, smpClients, msgQ} tSess@(userId, srv, unlessM (readTVarIO active) . throwError $ INTERNAL "agent is stopped" atomically (getClientVar tSess smpClients) >>= either - (newProtocolClient c tSess smpClients connectClient reconnectClient) + (newProtocolClient c tSess smpClients connectClient reconnectSMPClient) (waitForProtocolClient c tSess) where connectClient :: m SMPClient @@ -314,9 +315,11 @@ getSMPServerClient c@AgentClient {active, smpClients, msgQ} tSess@(userId, srv, removeClientAndSubs :: IO ([RcvQueue], [ConnId]) removeClientAndSubs = atomically $ do TM.delete tSess smpClients - (qs, conns) <- RQ.getDelSessQueues tSess $ activeSubs c + qs <- RQ.getDelSessQueues tSess $ activeSubs c mapM_ (`RQ.addQueue` pendingSubs c) qs - pure (qs, S.toList conns) + let cs = S.fromList $ map qConnId qs + cs' <- RQ.getConns $ activeSubs c + pure (qs, S.toList $ cs `S.difference` cs') serverDown :: ([RcvQueue], [ConnId]) -> IO () serverDown (qs, conns) = whenM (readTVarIO active) $ do @@ -325,36 +328,36 @@ getSMPServerClient c@AgentClient {active, smpClients, msgQ} tSess@(userId, srv, unless (null conns) $ notifySub "" $ DOWN srv conns unless (null qs) $ do atomically $ mapM_ (releaseGetLock c) qs - unliftIO u reconnectServer + unliftIO u $ reconnectServer c tSess - reconnectServer :: m () - reconnectServer = do - a <- async tryReconnectClient - atomically $ modifyTVar' (reconnections c) (a :) + notifySub :: ConnId -> ACommand 'Agent -> IO () + notifySub connId cmd = atomically $ writeTBQueue (subQ c) ("", connId, cmd) - tryReconnectClient :: m () - tryReconnectClient = do - ri <- asks $ reconnectInterval . config - withRetryInterval ri $ \loop -> - reconnectClient `catchError` const loop - - reconnectClient :: m () - reconnectClient = - withLockMap_ (reconnectLocks c) tSess "reconnect" $ - atomically (RQ.getSessQueues tSess $ pendingSubs c) >>= mapM_ resubscribe . L.nonEmpty - where - resubscribe :: NonEmpty RcvQueue -> m () - resubscribe qs = do - cs <- atomically . RQ.getConns $ activeSubs c - rs <- subscribeQueues c $ L.toList qs - let (errs, okConns) = partitionEithers $ map (\(RcvQueue {connId}, r) -> bimap (connId,) (const connId) r) rs - liftIO $ do - let conns = S.toList $ S.fromList okConns `S.difference` cs - unless (null conns) $ notifySub "" $ UP srv conns - let (tempErrs, finalErrs) = partition (temporaryAgentError . snd) errs - liftIO $ mapM_ (\(connId, e) -> notifySub connId $ ERR e) finalErrs - mapM_ (throwError . snd) $ listToMaybe tempErrs +reconnectServer :: AgentMonad m => AgentClient -> SMPTransportSession -> m () +reconnectServer c tSess = newAsyncAction tryReconnectSMPClient $ reconnections c + where + tryReconnectSMPClient aId = do + ri <- asks $ reconnectInterval . config + withRetryInterval ri $ \loop -> + reconnectSMPClient c tSess `catchError` const loop + atomically . removeAsyncAction aId $ reconnections c +reconnectSMPClient :: forall m. AgentMonad m => AgentClient -> SMPTransportSession -> m () +reconnectSMPClient c tSess@(_, srv, _) = + withLockMap_ (reconnectLocks c) tSess "reconnect" $ + atomically (RQ.getSessQueues tSess $ pendingSubs c) >>= mapM_ resubscribe . L.nonEmpty + where + resubscribe :: NonEmpty RcvQueue -> m () + resubscribe qs = do + cs <- atomically . RQ.getConns $ activeSubs c + rs <- subscribeQueues c $ L.toList qs + let (errs, okConns) = partitionEithers $ map (\(RcvQueue {connId}, r) -> bimap (connId,) (const connId) r) rs + liftIO $ do + let conns = S.toList $ S.fromList okConns `S.difference` cs + unless (null conns) $ notifySub "" $ UP srv conns + let (tempErrs, finalErrs) = partition (temporaryAgentError . snd) errs + liftIO $ mapM_ (\(connId, e) -> notifySub connId $ ERR e) finalErrs + mapM_ (throwError . snd) $ listToMaybe tempErrs notifySub :: ConnId -> ACommand 'Agent -> IO () notifySub connId cmd = atomically $ writeTBQueue (subQ c) ("", connId, cmd) @@ -363,7 +366,7 @@ getNtfServerClient c@AgentClient {active, ntfClients} tSess@(userId, srv, _) = d unlessM (readTVarIO active) . throwError $ INTERNAL "agent is stopped" atomically (getClientVar tSess ntfClients) >>= either - (newProtocolClient c tSess ntfClients connectClient $ pure ()) + (newProtocolClient c tSess ntfClients connectClient $ \_ _ -> pure ()) (waitForProtocolClient c tSess) where connectClient :: m NtfClient @@ -403,7 +406,7 @@ newProtocolClient :: TransportSession msg -> TMap (TransportSession msg) (ClientVar msg) -> m (ProtocolClient msg) -> - m () -> + (AgentClient -> TransportSession msg -> m ()) -> ClientVar msg -> m (ProtocolClient msg) newProtocolClient c tSess@(userId, srv, entityId_) clients connectClient reconnectClient clientVar = tryConnectClient pure tryConnectAsync @@ -426,13 +429,12 @@ newProtocolClient c tSess@(userId, srv, entityId_) clients connectClient reconne TM.delete tSess clients throwError e tryConnectAsync :: m () - tryConnectAsync = do - a <- async connectAsync - atomically $ modifyTVar' (asyncClients c) (a :) - connectAsync :: m () - connectAsync = do + tryConnectAsync = newAsyncAction connectAsync $ asyncClients c + connectAsync :: Int -> m () + connectAsync aId = do ri <- asks $ reconnectInterval . config - withRetryInterval ri $ \loop -> void $ tryConnectClient (const reconnectClient) loop + withRetryInterval ri $ \loop -> void $ tryConnectClient (const $ reconnectClient c tSess) loop + atomically . removeAsyncAction aId $ asyncClients c hostEvent :: forall msg. ProtocolTypeI (ProtoType msg) => (AProtocolType -> TransportHost -> ACommand 'Agent) -> ProtocolClient msg -> ACommand 'Agent hostEvent event client = event (AProtocolType $ protocolTypeI @(ProtoType msg)) $ transportHost' client @@ -448,8 +450,8 @@ closeAgentClient c = liftIO $ do atomically $ writeTVar (active c) False closeProtocolServerClients c smpClients closeProtocolServerClients c ntfClients - cancelActions $ reconnections c - cancelActions $ asyncClients c + cancelActions . actions $ reconnections c + cancelActions . actions $ asyncClients c cancelActions $ smpQueueMsgDeliveries c cancelActions $ asyncCmdProcesses c atomically . RQ.clear $ activeSubs c @@ -647,9 +649,13 @@ subscribeQueue c rq@RcvQueue {connId, server, rcvPrivateKey, rcvId} = do atomically $ do modifyTVar' (subscrConns c) $ S.insert connId RQ.addQueue rq $ pendingSubs c - withSMPClient c rq "SUB" $ \smp -> + r <- withSMPClient c rq "SUB" $ \smp -> liftIO (runExceptT (subscribeSMPQueue smp rcvPrivateKey rcvId) >>= processSubResult c rq) - >>= either throwError pure + case r of + Left e -> do + tSess <- mkSMPTransportSession c rq + reconnectServer c tSess >> throwError (protocolClientError SMP (B.unpack $ strEncode server) e) + _ -> pure () processSubResult :: AgentClient -> RcvQueue -> Either ProtocolClientError () -> IO (Either ProtocolClientError ()) processSubResult c rq r = do @@ -697,15 +703,16 @@ subscribeQueues c qs = do in M.alter (Just . maybe [rq] (rq <|)) tSess m subscribeQueues_ :: AgentMonad m => AgentClient -> SMPTransportSession -> NonEmpty RcvQueue -> m (NonEmpty (RcvQueue, Either AgentErrorType ())) -subscribeQueues_ c tSess@(userId, srv, _) qs = +subscribeQueues_ c tSess@(userId, srv, _) qs = do tryError (getSMPServerClient c tSess) >>= \case Left e -> pure $ L.map (,Left e) qs - Right smp -> liftIO $ do + Right smp -> do logServer "-->" c srv (bshow (length qs) <> " queues") "SUB" let n = (length qs - 1) `div` 90 + 1 - incClientStatN c userId smp n "SUBS" "OK" - rs <- L.zip qs <$> subscribeSMPQueues smp (L.map queueCreds qs) - mapM_ (uncurry $ processSubResult c) rs + liftIO $ incClientStatN c userId smp n "SUBS" "OK" + rs <- liftIO $ L.zip qs <$> subscribeSMPQueues smp (L.map queueCreds qs) + liftIO $ mapM_ (uncurry $ processSubResult c) rs + when (any temporaryClientError . lefts . map snd $ L.toList rs) $ reconnectServer c tSess pure $ L.map (second . first $ protocolClientError SMP $ clientServer smp) rs where queueCreds RcvQueue {rcvPrivateKey, rcvId} = (rcvPrivateKey, rcvId) diff --git a/src/Simplex/Messaging/Agent/TAsyncs.hs b/src/Simplex/Messaging/Agent/TAsyncs.hs new file mode 100644 index 000000000..80fc41840 --- /dev/null +++ b/src/Simplex/Messaging/Agent/TAsyncs.hs @@ -0,0 +1,25 @@ +module Simplex.Messaging.Agent.TAsyncs where + +import Control.Concurrent.STM (stateTVar) +import Control.Monad.IO.Unlift (MonadUnliftIO) +import Simplex.Messaging.TMap (TMap) +import qualified Simplex.Messaging.TMap as TM +import UnliftIO.Async (Async, async) +import UnliftIO.STM + +data TAsyncs = TAsyncs + { actionId :: TVar Int, + actions :: TMap Int (Async ()) + } + +newTAsyncs :: STM TAsyncs +newTAsyncs = TAsyncs <$> newTVar 0 <*> TM.empty + +newAsyncAction :: MonadUnliftIO m => (Int -> m ()) -> TAsyncs -> m () +newAsyncAction action as = do + aId <- atomically $ stateTVar (actionId as) $ \i -> (i + 1, i + 1) + a <- async $ action aId + atomically $ TM.insert aId a $ actions as + +removeAsyncAction :: Int -> TAsyncs -> STM () +removeAsyncAction aId = TM.delete aId . actions diff --git a/src/Simplex/Messaging/Agent/TRcvQueues.hs b/src/Simplex/Messaging/Agent/TRcvQueues.hs index ff525d4bb..aad0f16e6 100644 --- a/src/Simplex/Messaging/Agent/TRcvQueues.hs +++ b/src/Simplex/Messaging/Agent/TRcvQueues.hs @@ -1,5 +1,3 @@ -{-# LANGUAGE NamedFieldPuns #-} - module Simplex.Messaging.Agent.TRcvQueues ( TRcvQueues, empty, @@ -52,11 +50,11 @@ getSessQueues tSess (TRcvQueues qs) = M.foldl' addQ [] <$> readTVar qs where addQ qs' rq = if rq `isSession` tSess then rq : qs' else qs' -getDelSessQueues :: (UserId, SMPServer, Maybe ConnId) -> TRcvQueues -> STM ([RcvQueue], Set ConnId) -getDelSessQueues tSess (TRcvQueues qs) = stateTVar qs $ M.foldl' addQ (([], S.empty), M.empty) +getDelSessQueues :: (UserId, SMPServer, Maybe ConnId) -> TRcvQueues -> STM [RcvQueue] +getDelSessQueues tSess (TRcvQueues qs) = stateTVar qs $ M.foldl' addQ ([], M.empty) where - addQ (removed@(remQs, remConns), qs') rq@RcvQueue {connId} - | rq `isSession` tSess = ((rq : remQs, S.insert connId remConns), qs') + addQ (removed, qs') rq + | rq `isSession` tSess = (rq : removed, qs') | otherwise = (removed, M.insert (qKey rq) rq qs') isSession :: RcvQueue -> (UserId, SMPServer, Maybe ConnId) -> Bool diff --git a/src/Simplex/Messaging/Server/Stats.hs b/src/Simplex/Messaging/Server/Stats.hs index 82170e90f..b462eaffa 100644 --- a/src/Simplex/Messaging/Server/Stats.hs +++ b/src/Simplex/Messaging/Server/Stats.hs @@ -47,6 +47,7 @@ data ServerStatsData = ServerStatsData _qCount :: Int, _msgCount :: Int } + deriving (Show) newServerStats :: UTCTime -> STM ServerStats newServerStats ts = do @@ -88,7 +89,7 @@ setServerStats s d = do writeTVar (qDeleted s) $! _qDeleted d writeTVar (msgSent s) $! _msgSent d writeTVar (msgRecv s) $! _msgRecv d - setPeriodStats (activeQueuesNtf s) (_activeQueuesNtf d) + setPeriodStats (activeQueues s) (_activeQueues d) writeTVar (msgSentNtf s) $! _msgSentNtf d writeTVar (msgRecvNtf s) $! _msgRecvNtf d setPeriodStats (activeQueuesNtf s) (_activeQueuesNtf d) @@ -152,6 +153,7 @@ data PeriodStatsData a = PeriodStatsData _week :: Set a, _month :: Set a } + deriving (Show) newPeriodStatsData :: PeriodStatsData a newPeriodStatsData = PeriodStatsData {_day = S.empty, _week = S.empty, _month = S.empty} diff --git a/tests/ServerTests.hs b/tests/ServerTests.hs index 66315336a..5a83e5f1e 100644 --- a/tests/ServerTests.hs +++ b/tests/ServerTests.hs @@ -11,6 +11,7 @@ module ServerTests where +import AgentTests.NotificationTests (removeFileIfExists) import Control.Concurrent (ThreadId, killThread, threadDelay) import Control.Concurrent.STM import Control.Exception (SomeException, try) @@ -20,6 +21,7 @@ import Data.Bifunctor (first) import Data.ByteString.Base64 import Data.ByteString.Char8 (ByteString) import qualified Data.ByteString.Char8 as B +import qualified Data.Set as S import SMPClient import qualified Simplex.Messaging.Crypto as C import Simplex.Messaging.Encoding @@ -28,6 +30,7 @@ import Simplex.Messaging.Parsers (parseAll) import Simplex.Messaging.Protocol import Simplex.Messaging.Server.Env.STM (ServerConfig (..)) import Simplex.Messaging.Server.Expiration +import Simplex.Messaging.Server.Stats (PeriodStatsData (..), ServerStatsData (..)) import Simplex.Messaging.Transport import System.Directory (removeFile) import System.TimeIt (timeItT) @@ -605,6 +608,10 @@ logSize f = testRestoreMessages :: ATransport -> Spec testRestoreMessages at@(ATransport t) = it "should store messages on exit and restore on start" $ do + removeFileIfExists testStoreLogFile + removeFileIfExists testStoreMsgsFile + removeFileIfExists testServerStatsBackupFile + (sPub, sKey) <- C.generateSignatureKeyPair C.SEd25519 recipientId <- newTVarIO "" recipientKey <- newTVarIO Nothing @@ -632,11 +639,15 @@ testRestoreMessages at@(ATransport t) = Resp "6" _ (ERR QUOTA) <- signSendRecv h sKey ("6", sId, _SEND "hello 6") pure () + rId <- readTVarIO recipientId + logSize testStoreLogFile `shouldReturn` 2 logSize testStoreMsgsFile `shouldReturn` 5 + logSize testServerStatsBackupFile `shouldReturn` 16 + Right stats1 <- strDecode <$> B.readFile testServerStatsBackupFile + checkStats stats1 [rId] 5 1 withSmpServerStoreMsgLogOn at testPort . runTest t $ \h -> do - rId <- readTVarIO recipientId Just rKey <- readTVarIO recipientKey Just dh <- readTVarIO dhShared let dec = decryptMsgV3 dh @@ -650,9 +661,11 @@ testRestoreMessages at@(ATransport t) = logSize testStoreLogFile `shouldReturn` 1 -- the last message is not removed because it was not ACK'd logSize testStoreMsgsFile `shouldReturn` 3 + logSize testServerStatsBackupFile `shouldReturn` 16 + Right stats2 <- strDecode <$> B.readFile testServerStatsBackupFile + checkStats stats2 [rId] 5 3 withSmpServerStoreMsgLogOn at testPort . runTest t $ \h -> do - rId <- readTVarIO recipientId Just rKey <- readTVarIO recipientKey Just dh <- readTVarIO dhShared let dec = decryptMsgV3 dh @@ -667,9 +680,13 @@ testRestoreMessages at@(ATransport t) = logSize testStoreLogFile `shouldReturn` 1 logSize testStoreMsgsFile `shouldReturn` 0 + logSize testServerStatsBackupFile `shouldReturn` 16 + Right stats3 <- strDecode <$> B.readFile testServerStatsBackupFile + checkStats stats3 [rId] 5 5 removeFile testStoreLogFile removeFile testStoreMsgsFile + removeFile testServerStatsBackupFile where runTest :: Transport c => TProxy c -> (THandle c -> IO ()) -> ThreadId -> Expectation runTest _ test' server = do @@ -679,6 +696,20 @@ testRestoreMessages at@(ATransport t) = runClient :: Transport c => TProxy c -> (THandle c -> IO ()) -> Expectation runClient _ test' = testSMPClient test' `shouldReturn` () +checkStats :: ServerStatsData -> [RecipientId] -> Int -> Int -> Expectation +checkStats s qs sent received = do + _qCreated s `shouldBe` length qs + _qSecured s `shouldBe` length qs + _qDeleted s `shouldBe` 0 + _msgSent s `shouldBe` sent + _msgRecv s `shouldBe` received + _msgSentNtf s `shouldBe` 0 + _msgRecvNtf s `shouldBe` 0 + let PeriodStatsData {_day, _week, _month} = _activeQueues s + S.toList _day `shouldBe` qs + S.toList _week `shouldBe` qs + S.toList _month `shouldBe` qs + testRestoreMessagesV2 :: ATransport -> Spec testRestoreMessagesV2 at@(ATransport t) = it "should store messages on exit and restore on start" $ do