handle SMP errors in ntf subscriber (#441)

This commit is contained in:
Evgeny Poberezkin
2022-06-30 16:04:01 +01:00
committed by GitHub
parent c82fae72f2
commit f6a321e198
2 changed files with 42 additions and 24 deletions

View File

@@ -38,8 +38,9 @@ import qualified Simplex.Messaging.TMap as TM
import Simplex.Messaging.Transport (ATransport (..), THandle (..), TProxy, Transport (..))
import Simplex.Messaging.Transport.Server (runTransportServer)
import Simplex.Messaging.Util
import System.Mem.Weak (deRefWeak)
import UnliftIO (IOMode (..), async, uninterruptibleCancel)
import UnliftIO.Concurrent (forkFinally, forkIO, threadDelay)
import UnliftIO.Concurrent (forkIO, killThread, mkWeakThreadId, threadDelay)
import UnliftIO.Exception
import UnliftIO.STM
@@ -57,8 +58,7 @@ ntfServer NtfServerConfig {transports} started = do
ps <- asks pushServer
subs <- readTVarIO =<< asks (subscriptions . store)
void . forkIO $ resubscribe s subs
raceAny_ (ntfSubscriber s : ntfPush ps : map runServer transports)
`finally` withNtfLog closeStoreLog
raceAny_ (ntfSubscriber s : ntfPush ps : map runServer transports) `finally` stopServer
where
runServer :: (ServiceName, ATransport) -> m ()
runServer (tcpPort, ATransport t) = do
@@ -72,6 +72,11 @@ ntfServer NtfServerConfig {transports} started = do
Right th -> runNtfClientTransport th
Left _ -> pure ()
stopServer :: m ()
stopServer = do
withNtfLog closeStoreLog
asks (smpSubscribers . subscriber) >>= readTVarIO >>= mapM_ (\SMPSubscriber {subThreadId} -> readTVarIO subThreadId >>= mapM_ (liftIO . deRefWeak >=> mapM_ killThread))
resubscribe :: (MonadUnliftIO m, MonadReader NtfEnv m) => NtfSubscriber -> Map NtfSubscriptionId NtfSubData -> m ()
resubscribe NtfSubscriber {newSubQ} subs = do
d <- asks $ resubscribeDelay . config
@@ -90,34 +95,43 @@ ntfSubscriber NtfSubscriber {smpSubscribers, newSubQ, smpAgent = ca@SMPClientAge
atomically (readTBQueue newSubQ) >>= \case
sub@(NtfSub NtfSubData {smpQueue = SMPQueueNtf {smpServer}}) -> do
SMPSubscriber {newSubQ = subscriberSubQ} <- getSMPSubscriber smpServer
atomically $ writeTBQueue subscriberSubQ sub
atomically $ writeTQueue subscriberSubQ sub
getSMPSubscriber :: SMPServer -> m SMPSubscriber
getSMPSubscriber smpServer =
atomically (TM.lookup smpServer smpSubscribers) >>= maybe createSMPSubscriber pure
where
createSMPSubscriber = do
qSize <- asks $ subQSize . config
newSubscriber <- atomically $ newSMPSubscriber qSize
atomically $ TM.insert smpServer newSubscriber smpSubscribers
_ <- runSMPSubscriber newSubscriber `forkFinally` \_ -> atomically (TM.delete smpServer smpSubscribers >> failSubscriptions newSubscriber)
pure newSubscriber
-- TODO mark subscriptions as failed
failSubscriptions _ = pure ()
sub@SMPSubscriber {subThreadId} <- atomically newSMPSubscriber
atomically $ TM.insert smpServer sub smpSubscribers
tId <- mkWeakThreadId =<< forkIO (runSMPSubscriber sub)
atomically . writeTVar subThreadId $ Just tId
pure sub
runSMPSubscriber :: SMPSubscriber -> m ()
runSMPSubscriber SMPSubscriber {newSubQ = subscriberSubQ} =
forever $
atomically (peekTBQueue subscriberSubQ) >>= \case
atomically (peekTQueue subscriberSubQ) >>= \case
NtfSub NtfSubData {smpQueue, notifierKey} -> do
updateSubStatus smpQueue NSPending
let SMPQueueNtf {smpServer, notifierId} = smpQueue
liftIO (runExceptT $ subscribeQueue ca smpServer ((SPNotifier, notifierId), notifierKey)) >>= \case
Right _ -> do
updateSubStatus smpQueue NSActive
_ <- atomically $ readTBQueue subscriberSubQ
pure ()
Left _e -> pure ()
Right _ -> update smpQueue NSActive
Left err -> case err of
PCEProtocolError AUTH -> update smpQueue NSSMPAuth
PCEProtocolError e -> update smpQueue $ NSSMPErr e
PCEIOError e -> log' $ "IOError " <> T.pack (show e)
PCEResponseError e -> log' $ "ResponseError " <> T.pack (show e)
PCEUnexpectedResponse -> log' "UnexpectedResponse"
PCETransportError e -> log' $ "TransportError " <> T.pack (show e)
PCESignatureError e -> log' $ "SignatureError " <> T.pack (show e)
PCEResponseTimeout -> pure ()
PCENetworkError -> pure ()
where
update smpQueue status = do
updateSubStatus smpQueue status
void . atomically $ readTQueue subscriberSubQ
log' e = logError $ "SMPSubscriber subscribeQueue " <> e
receiveSMP :: m ()
receiveSMP = forever $ do
@@ -161,7 +175,7 @@ ntfSubscriber NtfSubscriber {smpSubscribers, newSubQ, smpAgent = ca@SMPClientAge
PCEIOError e -> logErr e
_ -> pure ()
where
logErr e = logError (T.pack $ "ntfSubscriber receiveAgent error: " <> show e)
logErr e = logError $ "ntfSubscriber receiveAgent error: " <> T.pack (show e)
updateSubStatus smpQueue status = do
st <- asks store

View File

@@ -7,6 +7,7 @@
module Simplex.Messaging.Notifications.Server.Env where
import Control.Concurrent (ThreadId)
import Control.Concurrent.Async (Async)
import Control.Monad.IO.Unlift
import Crypto.Random
@@ -30,6 +31,7 @@ import qualified Simplex.Messaging.TMap as TM
import Simplex.Messaging.Transport (ATransport)
import Simplex.Messaging.Transport.Server (loadFingerprint, loadTLSServerParams)
import System.IO (IOMode (..))
import System.Mem.Weak (Weak)
import UnliftIO.STM
data NtfServerConfig = NtfServerConfig
@@ -93,14 +95,16 @@ newNtfSubscriber qSize smpAgentCfg = do
smpAgent <- newSMPClientAgent smpAgentCfg
pure NtfSubscriber {smpSubscribers, newSubQ, smpAgent}
newtype SMPSubscriber = SMPSubscriber
{ newSubQ :: TBQueue (NtfEntityRec 'Subscription)
data SMPSubscriber = SMPSubscriber
{ newSubQ :: TQueue (NtfEntityRec 'Subscription),
subThreadId :: TVar (Maybe (Weak ThreadId))
}
newSMPSubscriber :: Natural -> STM SMPSubscriber
newSMPSubscriber qSize = do
newSubQ <- newTBQueue qSize
pure SMPSubscriber {newSubQ}
newSMPSubscriber :: STM SMPSubscriber
newSMPSubscriber = do
newSubQ <- newTQueue
subThreadId <- newTVar Nothing
pure SMPSubscriber {newSubQ, subThreadId}
data NtfPushServer = NtfPushServer
{ pushQ :: TBQueue (NtfTknData, PushNotification),