From 7a238812b76d2bae0ceb453dbf41e140dd65ca0e Mon Sep 17 00:00:00 2001 From: Evgeny Poberezkin <2769109+epoberezkin@users.noreply.github.com> Date: Fri, 8 Jul 2022 14:46:01 +0100 Subject: [PATCH] ntf server: fix resubscribing to SMP server after it was restarted, test (#465) --- src/Simplex/Messaging/Agent.hs | 10 ++++ src/Simplex/Messaging/Client/Agent.hs | 6 +-- src/Simplex/Messaging/Notifications/Server.hs | 11 ++-- .../Notifications/Server/Push/APNS.hs | 2 + tests/AgentTests/NotificationTests.hs | 53 +++++++++++++++++-- 5 files changed, 71 insertions(+), 11 deletions(-) diff --git a/src/Simplex/Messaging/Agent.hs b/src/Simplex/Messaging/Agent.hs index ce0fdcd66..cd7269474 100644 --- a/src/Simplex/Messaging/Agent.hs +++ b/src/Simplex/Messaging/Agent.hs @@ -59,6 +59,7 @@ module Simplex.Messaging.Agent checkNtfToken, deleteNtfToken, getNtfToken, + getNtfTokenData, deleteNtfSub, activateAgent, suspendAgent, @@ -207,6 +208,9 @@ deleteNtfToken c = withAgentEnv c . deleteNtfToken' c getNtfToken :: AgentErrorMonad m => AgentClient -> m (DeviceToken, NtfTknStatus, NotificationsMode) getNtfToken c = withAgentEnv c $ getNtfToken' c +getNtfTokenData :: AgentErrorMonad m => AgentClient -> m NtfToken +getNtfTokenData c = withAgentEnv c $ getNtfTokenData' c + -- | Delete notification subscription for connection deleteNtfSub :: AgentErrorMonad m => AgentClient -> ConnId -> m () deleteNtfSub c = withAgentEnv c . deleteNtfSub' c @@ -745,6 +749,12 @@ getNtfToken' c = Just NtfToken {deviceToken, ntfTknStatus, ntfMode} -> pure (deviceToken, ntfTknStatus, ntfMode) _ -> throwError $ CMD PROHIBITED +getNtfTokenData' :: AgentMonad m => AgentClient -> m NtfToken +getNtfTokenData' c = + withStore' c getSavedNtfToken >>= \case + Just tkn -> pure tkn + _ -> throwError $ CMD PROHIBITED + -- | Delete notification subscription for connection, in Reader monad deleteNtfSub' :: AgentMonad m => AgentClient -> ConnId -> m () deleteNtfSub' _c connId = do diff --git a/src/Simplex/Messaging/Client/Agent.hs b/src/Simplex/Messaging/Client/Agent.hs index bdb118e6f..664959eb2 100644 --- a/src/Simplex/Messaging/Client/Agent.hs +++ b/src/Simplex/Messaging/Client/Agent.hs @@ -30,7 +30,7 @@ import Simplex.Messaging.Protocol (BrokerMsg, ProtocolServer (..), QueueId, SMPS import Simplex.Messaging.TMap (TMap) import qualified Simplex.Messaging.TMap as TM import Simplex.Messaging.Transport -import Simplex.Messaging.Util (catchAll_, tryE, whenM, ($>>=)) +import Simplex.Messaging.Util (catchAll_, tryE, unlessM, ($>>=)) import System.Timeout (timeout) import UnliftIO (async, forConcurrently_) import UnliftIO.Exception (Exception) @@ -47,7 +47,7 @@ data SMPClientAgentEvent | CASubError SMPServer SMPSub ProtocolClientError data SMPSubParty = SPRecipient | SPNotifier - deriving (Eq, Ord) + deriving (Eq, Ord, Show) type SMPSub = (SMPSubParty, QueueId) @@ -203,7 +203,7 @@ getSMPServerClient' ca@SMPClientAgent {agentCfg, smpClients, msgQ} srv = notify $ CAReconnected srv cs <- atomically $ mapM readTVar =<< TM.lookup srv (pendingSrvSubs ca) forConcurrently_ (maybe [] M.assocs cs) $ \sub@(s, _) -> - whenM (atomically $ hasSub (srvSubs ca) srv s) $ + unlessM (atomically $ hasSub (srvSubs ca) srv s) $ subscribe_ smp sub `catchE` handleError s where subscribe_ :: SMPClient -> (SMPSub, C.APrivateSignKey) -> ExceptT ProtocolClientError IO () diff --git a/src/Simplex/Messaging/Notifications/Server.hs b/src/Simplex/Messaging/Notifications/Server.hs index 70c5832b3..254e8707c 100644 --- a/src/Simplex/Messaging/Notifications/Server.hs +++ b/src/Simplex/Messaging/Notifications/Server.hs @@ -31,7 +31,7 @@ import Simplex.Messaging.Notifications.Server.Push.APNS (PNMessageData (..), Pus import Simplex.Messaging.Notifications.Server.Store import Simplex.Messaging.Notifications.Server.StoreLog import Simplex.Messaging.Notifications.Transport -import Simplex.Messaging.Protocol (ErrorType (..), SMPServer, SignedTransmission, Transmission, encodeTransmission, tGet, tPut) +import Simplex.Messaging.Protocol (ErrorType (..), ProtocolServer (host), SMPServer, SignedTransmission, Transmission, encodeTransmission, tGet, tPut) import qualified Simplex.Messaging.Protocol as SMP import Simplex.Messaging.Server import qualified Simplex.Messaging.TMap as TM @@ -147,16 +147,19 @@ ntfSubscriber NtfSubscriber {smpSubscribers, newSubQ, smpAgent = ca@SMPClientAge forever $ atomically (readTBQueue agentQ) >>= \case CAConnected _ -> pure () - CADisconnected srv subs -> + CADisconnected srv subs -> do + logInfo . T.pack $ "SMP server disconnected " <> host srv <> " (" <> show (length subs) <> ") subscriptions" forM_ subs $ \(_, ntfId) -> do let smpQueue = SMPQueueNtf srv ntfId updateSubStatus smpQueue NSInactive - CAReconnected _ -> pure () + CAReconnected srv -> + logInfo $ "SMP server reconnected " <> T.pack (host srv) CAResubscribed srv sub -> do let ntfId = snd sub smpQueue = SMPQueueNtf srv ntfId updateSubStatus smpQueue NSActive - CASubError srv (_, ntfId) err -> + CASubError srv (_, ntfId) err -> do + logError . T.pack $ "SMP subscription error on server " <> host srv <> ": " <> show err handleSubError (SMPQueueNtf srv ntfId) err handleSubError :: SMPQueueNtf -> ProtocolClientError -> m () diff --git a/src/Simplex/Messaging/Notifications/Server/Push/APNS.hs b/src/Simplex/Messaging/Notifications/Server/Push/APNS.hs index 13d22b594..8f89594e0 100644 --- a/src/Simplex/Messaging/Notifications/Server/Push/APNS.hs +++ b/src/Simplex/Messaging/Notifications/Server/Push/APNS.hs @@ -99,6 +99,7 @@ data PushNotification | PNMessage PNMessageData | PNAlert Text | PNCheckMessages + deriving (Show) data PNMessageData = PNMessageData { smpQueue :: SMPQueueNtf, @@ -106,6 +107,7 @@ data PNMessageData = PNMessageData nmsgNonce :: C.CbNonce, encNMsgMeta :: EncNMsgMeta } + deriving (Show) instance StrEncoding PNMessageData where strEncode PNMessageData {smpQueue, ntfTs, nmsgNonce, encNMsgMeta} = diff --git a/tests/AgentTests/NotificationTests.hs b/tests/AgentTests/NotificationTests.hs index c7e8c9419..10e0ce7a5 100644 --- a/tests/AgentTests/NotificationTests.hs +++ b/tests/AgentTests/NotificationTests.hs @@ -14,19 +14,22 @@ import Control.Concurrent (killThread, threadDelay) import Control.Monad.Except import qualified Data.Aeson as J import qualified Data.Aeson.Types as JT -import Data.Bifunctor (bimap) +import Data.Bifunctor (bimap, first) import qualified Data.ByteString.Base64.URL as U import Data.ByteString.Char8 (ByteString) import Data.Text.Encoding (encodeUtf8) import NtfClient import SMPAgentClient (agentCfg, initAgentServers, testDB, testDB2) -import SMPClient (withSmpServer) +import SMPClient (testPort, withSmpServer, withSmpServerStoreLogOn) import Simplex.Messaging.Agent +import Simplex.Messaging.Agent.Client (AgentClient) import Simplex.Messaging.Agent.Env.SQLite (AgentConfig (..)) import Simplex.Messaging.Agent.Protocol import qualified Simplex.Messaging.Crypto as C +import Simplex.Messaging.Encoding.String import Simplex.Messaging.Notifications.Protocol import Simplex.Messaging.Notifications.Server.Push.APNS +import Simplex.Messaging.Notifications.Types (NtfToken (..)) import Simplex.Messaging.Protocol (ErrorType (AUTH), MsgFlags (MsgFlags), SMPMsgMeta (..)) import qualified Simplex.Messaging.Protocol as SMP import Simplex.Messaging.Transport (ATransport) @@ -73,11 +76,15 @@ notificationTests t = withSmpServer t $ withAPNSMockServer $ \apns -> withNtfServer t $ testChangeToken apns - describe "Notifications server store log" $ do + describe "Notifications server store log" $ it "should save and restore tokens and subscriptions" $ \_ -> withSmpServer t $ withAPNSMockServer $ \apns -> testNotificationsStoreLog t apns + describe "Notifications after SMP server restart" $ + it "should resume subscriptions after SMP server is restarted" $ \_ -> + withAPNSMockServer $ \apns -> + withNtfServer t $ testNotificationsSMPRestart t apns testNotificationToken :: APNSMockServer -> IO () testNotificationToken APNSMockServer {apnsQ} = do @@ -430,8 +437,8 @@ testNotificationsStoreLog t APNSMockServer {apnsQ} = do alice <- getSMPAgentClient agentCfg initAgentServers bob <- getSMPAgentClient agentCfg {dbFile = testDB2} initAgentServers Right (aliceId, bobId) <- withNtfServerStoreLog t $ \threadId -> runExceptT $ do - _ <- registerTestToken alice "abcd" NMInstant apnsQ (aliceId, bobId) <- makeConnection alice bob + _ <- registerTestToken alice "abcd" NMInstant apnsQ liftIO $ threadDelay 250000 4 <- sendMessage bob aliceId (SMP.MsgFlags True) "hello" get bob ##> ("", aliceId, SENT 4) @@ -452,6 +459,37 @@ testNotificationsStoreLog t APNSMockServer {apnsQ} = do liftIO $ killThread threadId pure () +testNotificationsSMPRestart :: ATransport -> APNSMockServer -> IO () +testNotificationsSMPRestart t APNSMockServer {apnsQ} = do + alice <- getSMPAgentClient agentCfg initAgentServers + bob <- getSMPAgentClient agentCfg {dbFile = testDB2} initAgentServers + Right (aliceId, bobId) <- withSmpServerStoreLogOn t testPort $ \threadId -> runExceptT $ do + (aliceId, bobId) <- makeConnection alice bob + _ <- registerTestToken alice "abcd" NMInstant apnsQ + liftIO $ threadDelay 250000 + 4 <- sendMessage bob aliceId (SMP.MsgFlags True) "hello" + get bob ##> ("", aliceId, SENT 4) + void $ messageNotification apnsQ + get alice =##> \case ("", c, Msg "hello") -> c == bobId; _ -> False + ackMessage alice bobId 4 + liftIO $ killThread threadId + pure (aliceId, bobId) + + Right () <- runExceptT $ do + get alice =##> \case ("", "", DOWN _ [c]) -> c == bobId; _ -> False + get bob =##> \case ("", "", DOWN _ [c]) -> c == aliceId; _ -> False + + Right () <- withSmpServerStoreLogOn t testPort $ \threadId -> runExceptT $ do + get alice =##> \case ("", "", UP _ [c]) -> c == bobId; _ -> False + get bob =##> \case ("", "", UP _ [c]) -> c == aliceId; _ -> False + liftIO $ threadDelay 1000000 + 5 <- sendMessage bob aliceId (SMP.MsgFlags True) "hello again" + get bob ##> ("", aliceId, SENT 5) + _ <- messageNotificationData alice apnsQ + get alice =##> \case ("", c, Msg "hello again") -> c == bobId; _ -> False + liftIO $ killThread threadId + pure () + messageNotification :: TBQueue APNSMockRequest -> ExceptT AgentErrorType IO (C.CbNonce, ByteString) messageNotification apnsQ = do 1000000 `timeout` atomically (readTBQueue apnsQ) >>= \case @@ -463,6 +501,13 @@ messageNotification apnsQ = do pure (nonce, message) _ -> error "bad notification" +messageNotificationData :: AgentClient -> TBQueue APNSMockRequest -> ExceptT AgentErrorType IO PNMessageData +messageNotificationData c apnsQ = do + (nonce, message) <- messageNotification apnsQ + NtfToken {ntfDhSecret = Just dhSecret} <- getNtfTokenData c + Right pnMsgData <- liftEither . first INTERNAL $ Right . strDecode =<< first show (C.cbDecrypt dhSecret nonce message) + pure pnMsgData + noNotification :: TBQueue APNSMockRequest -> ExceptT AgentErrorType IO () noNotification apnsQ = do 500000 `timeout` atomically (readTBQueue apnsQ) >>= \case