From f6a321e198ca9d5b2e828ed102a882cfd4ec6533 Mon Sep 17 00:00:00 2001 From: Evgeny Poberezkin <2769109+epoberezkin@users.noreply.github.com> Date: Thu, 30 Jun 2022 16:04:01 +0100 Subject: [PATCH] handle SMP errors in ntf subscriber (#441) --- src/Simplex/Messaging/Notifications/Server.hs | 50 ++++++++++++------- .../Messaging/Notifications/Server/Env.hs | 16 +++--- 2 files changed, 42 insertions(+), 24 deletions(-) diff --git a/src/Simplex/Messaging/Notifications/Server.hs b/src/Simplex/Messaging/Notifications/Server.hs index 481a0ef15..f6d3dbf37 100644 --- a/src/Simplex/Messaging/Notifications/Server.hs +++ b/src/Simplex/Messaging/Notifications/Server.hs @@ -38,8 +38,9 @@ import qualified Simplex.Messaging.TMap as TM import Simplex.Messaging.Transport (ATransport (..), THandle (..), TProxy, Transport (..)) import Simplex.Messaging.Transport.Server (runTransportServer) import Simplex.Messaging.Util +import System.Mem.Weak (deRefWeak) import UnliftIO (IOMode (..), async, uninterruptibleCancel) -import UnliftIO.Concurrent (forkFinally, forkIO, threadDelay) +import UnliftIO.Concurrent (forkIO, killThread, mkWeakThreadId, threadDelay) import UnliftIO.Exception import UnliftIO.STM @@ -57,8 +58,7 @@ ntfServer NtfServerConfig {transports} started = do ps <- asks pushServer subs <- readTVarIO =<< asks (subscriptions . store) void . forkIO $ resubscribe s subs - raceAny_ (ntfSubscriber s : ntfPush ps : map runServer transports) - `finally` withNtfLog closeStoreLog + raceAny_ (ntfSubscriber s : ntfPush ps : map runServer transports) `finally` stopServer where runServer :: (ServiceName, ATransport) -> m () runServer (tcpPort, ATransport t) = do @@ -72,6 +72,11 @@ ntfServer NtfServerConfig {transports} started = do Right th -> runNtfClientTransport th Left _ -> pure () + stopServer :: m () + stopServer = do + withNtfLog closeStoreLog + asks (smpSubscribers . subscriber) >>= readTVarIO >>= mapM_ (\SMPSubscriber {subThreadId} -> readTVarIO subThreadId >>= mapM_ (liftIO . deRefWeak >=> mapM_ killThread)) + resubscribe :: (MonadUnliftIO m, MonadReader NtfEnv m) => NtfSubscriber -> Map NtfSubscriptionId NtfSubData -> m () resubscribe NtfSubscriber {newSubQ} subs = do d <- asks $ resubscribeDelay . config @@ -90,34 +95,43 @@ ntfSubscriber NtfSubscriber {smpSubscribers, newSubQ, smpAgent = ca@SMPClientAge atomically (readTBQueue newSubQ) >>= \case sub@(NtfSub NtfSubData {smpQueue = SMPQueueNtf {smpServer}}) -> do SMPSubscriber {newSubQ = subscriberSubQ} <- getSMPSubscriber smpServer - atomically $ writeTBQueue subscriberSubQ sub + atomically $ writeTQueue subscriberSubQ sub getSMPSubscriber :: SMPServer -> m SMPSubscriber getSMPSubscriber smpServer = atomically (TM.lookup smpServer smpSubscribers) >>= maybe createSMPSubscriber pure where createSMPSubscriber = do - qSize <- asks $ subQSize . config - newSubscriber <- atomically $ newSMPSubscriber qSize - atomically $ TM.insert smpServer newSubscriber smpSubscribers - _ <- runSMPSubscriber newSubscriber `forkFinally` \_ -> atomically (TM.delete smpServer smpSubscribers >> failSubscriptions newSubscriber) - pure newSubscriber - -- TODO mark subscriptions as failed - failSubscriptions _ = pure () + sub@SMPSubscriber {subThreadId} <- atomically newSMPSubscriber + atomically $ TM.insert smpServer sub smpSubscribers + tId <- mkWeakThreadId =<< forkIO (runSMPSubscriber sub) + atomically . writeTVar subThreadId $ Just tId + pure sub runSMPSubscriber :: SMPSubscriber -> m () runSMPSubscriber SMPSubscriber {newSubQ = subscriberSubQ} = forever $ - atomically (peekTBQueue subscriberSubQ) >>= \case + atomically (peekTQueue subscriberSubQ) >>= \case NtfSub NtfSubData {smpQueue, notifierKey} -> do updateSubStatus smpQueue NSPending let SMPQueueNtf {smpServer, notifierId} = smpQueue liftIO (runExceptT $ subscribeQueue ca smpServer ((SPNotifier, notifierId), notifierKey)) >>= \case - Right _ -> do - updateSubStatus smpQueue NSActive - _ <- atomically $ readTBQueue subscriberSubQ - pure () - Left _e -> pure () + Right _ -> update smpQueue NSActive + Left err -> case err of + PCEProtocolError AUTH -> update smpQueue NSSMPAuth + PCEProtocolError e -> update smpQueue $ NSSMPErr e + PCEIOError e -> log' $ "IOError " <> T.pack (show e) + PCEResponseError e -> log' $ "ResponseError " <> T.pack (show e) + PCEUnexpectedResponse -> log' "UnexpectedResponse" + PCETransportError e -> log' $ "TransportError " <> T.pack (show e) + PCESignatureError e -> log' $ "SignatureError " <> T.pack (show e) + PCEResponseTimeout -> pure () + PCENetworkError -> pure () + where + update smpQueue status = do + updateSubStatus smpQueue status + void . atomically $ readTQueue subscriberSubQ + log' e = logError $ "SMPSubscriber subscribeQueue " <> e receiveSMP :: m () receiveSMP = forever $ do @@ -161,7 +175,7 @@ ntfSubscriber NtfSubscriber {smpSubscribers, newSubQ, smpAgent = ca@SMPClientAge PCEIOError e -> logErr e _ -> pure () where - logErr e = logError (T.pack $ "ntfSubscriber receiveAgent error: " <> show e) + logErr e = logError $ "ntfSubscriber receiveAgent error: " <> T.pack (show e) updateSubStatus smpQueue status = do st <- asks store diff --git a/src/Simplex/Messaging/Notifications/Server/Env.hs b/src/Simplex/Messaging/Notifications/Server/Env.hs index f1f59ad06..a6f08b682 100644 --- a/src/Simplex/Messaging/Notifications/Server/Env.hs +++ b/src/Simplex/Messaging/Notifications/Server/Env.hs @@ -7,6 +7,7 @@ module Simplex.Messaging.Notifications.Server.Env where +import Control.Concurrent (ThreadId) import Control.Concurrent.Async (Async) import Control.Monad.IO.Unlift import Crypto.Random @@ -30,6 +31,7 @@ import qualified Simplex.Messaging.TMap as TM import Simplex.Messaging.Transport (ATransport) import Simplex.Messaging.Transport.Server (loadFingerprint, loadTLSServerParams) import System.IO (IOMode (..)) +import System.Mem.Weak (Weak) import UnliftIO.STM data NtfServerConfig = NtfServerConfig @@ -93,14 +95,16 @@ newNtfSubscriber qSize smpAgentCfg = do smpAgent <- newSMPClientAgent smpAgentCfg pure NtfSubscriber {smpSubscribers, newSubQ, smpAgent} -newtype SMPSubscriber = SMPSubscriber - { newSubQ :: TBQueue (NtfEntityRec 'Subscription) +data SMPSubscriber = SMPSubscriber + { newSubQ :: TQueue (NtfEntityRec 'Subscription), + subThreadId :: TVar (Maybe (Weak ThreadId)) } -newSMPSubscriber :: Natural -> STM SMPSubscriber -newSMPSubscriber qSize = do - newSubQ <- newTBQueue qSize - pure SMPSubscriber {newSubQ} +newSMPSubscriber :: STM SMPSubscriber +newSMPSubscriber = do + newSubQ <- newTQueue + subThreadId <- newTVar Nothing + pure SMPSubscriber {newSubQ, subThreadId} data NtfPushServer = NtfPushServer { pushQ :: TBQueue (NtfTknData, PushNotification),