From f7cdec2f0840b45fc07bfa1a1e9ed104e7da7dec Mon Sep 17 00:00:00 2001 From: Alexander Bondarenko <486682+dpwiz@users.noreply.github.com> Date: Fri, 19 Jan 2024 21:42:28 +0200 Subject: [PATCH] fix: support multiple notification servers in configuration (#971) * tests: add trpl-keeps-server check * add smp server switch check * add connection test and fix withNtfServer * Update src/Simplex/Messaging/Agent/NtfSubSupervisor.hs Co-authored-by: Evgeny Poberezkin * use ntfServer from token * rename --------- Co-authored-by: Evgeny Poberezkin Co-authored-by: Evgeny Poberezkin --- .../Messaging/Agent/NtfSubSupervisor.hs | 10 +- tests/AgentTests/NotificationTests.hs | 94 +++++++++++++------ 2 files changed, 71 insertions(+), 33 deletions(-) diff --git a/src/Simplex/Messaging/Agent/NtfSubSupervisor.hs b/src/Simplex/Messaging/Agent/NtfSubSupervisor.hs index 5af2f053e..5e51b2217 100644 --- a/src/Simplex/Messaging/Agent/NtfSubSupervisor.hs +++ b/src/Simplex/Messaging/Agent/NtfSubSupervisor.hs @@ -72,7 +72,7 @@ processNtfSub c (connId, cmd) = do logInfo $ "processNtfSub, NSCCreate - a = " <> tshow a case a of Nothing -> do - withNtfServer c $ \ntfServer -> do + withTokenServer $ \ntfServer -> do case clientNtfCreds of Just ClientNtfCreds {notifierId} -> do let newSub = newNtfSubscription connId smpServer (Just notifierId) ntfServer NASKey @@ -99,7 +99,7 @@ processNtfSub c (connId, cmd) = do | isDeleteNtfSubAction action -> do if ntfSubStatus == NASNew || ntfSubStatus == NASOff || ntfSubStatus == NASDeleted then resetSubscription - else withNtfServer c $ \ntfServer -> do + else withTokenServer $ \ntfServer -> do withStore' c $ \db -> supervisorUpdateNtfSub db sub {ntfServer} (NtfSubNTFAction NSACreate) void $ getNtfNTFWorker True c ntfServer | otherwise -> case action of @@ -111,7 +111,7 @@ processNtfSub c (connId, cmd) = do void $ getNtfNTFWorker True c subNtfServer resetSubscription :: m () resetSubscription = - withNtfServer c $ \ntfServer -> do + withTokenServer $ \ntfServer -> do let sub' = sub {ntfQueueId = Nothing, ntfServer, ntfSubId = Nothing, ntfSubStatus = NASNew} withStore' c $ \db -> supervisorUpdateNtfSub db sub' (NtfSubSMPAction NSASmpKey) void $ getNtfSMPWorker True c smpServer @@ -143,8 +143,8 @@ getNtfSMPWorker hasWork c server = do ws <- asks $ ntfSMPWorkers . ntfSupervisor getAgentWorker "ntf_smp" hasWork c server ws $ runNtfSMPWorker c server -withNtfServer :: AgentMonad' m => AgentClient -> (NtfServer -> m ()) -> m () -withNtfServer c action = getNtfServer c >>= mapM_ action +withTokenServer :: AgentMonad' m => (NtfServer -> m ()) -> m () +withTokenServer action = getNtfToken >>= mapM_ (\NtfToken {ntfServer} -> action ntfServer) runNtfWorker :: forall m. AgentMonad m => AgentClient -> NtfServer -> Worker -> m () runNtfWorker c srv Worker {doWork} = do diff --git a/tests/AgentTests/NotificationTests.hs b/tests/AgentTests/NotificationTests.hs index 448b7d368..15ba1993e 100644 --- a/tests/AgentTests/NotificationTests.hs +++ b/tests/AgentTests/NotificationTests.hs @@ -23,7 +23,7 @@ import qualified Data.ByteString.Base64.URL as U import Data.ByteString.Char8 (ByteString) import Data.Text.Encoding (encodeUtf8) import NtfClient -import SMPAgentClient (agentCfg, initAgentServers, initAgentServers2, testDB, testDB2, testNtfServer2) +import SMPAgentClient (agentCfg, initAgentServers, initAgentServers2, testDB, testDB2, testDB3, testNtfServer2) import SMPClient (cfg, testPort, testPort2, testStoreLogFile2, withSmpServer, withSmpServerConfigOn, withSmpServerStoreLogOn, xit') import Simplex.Messaging.Agent import Simplex.Messaging.Agent.Client (withStore') @@ -105,11 +105,16 @@ notificationTests t = testServerMatrix2 t $ \servers -> withAPNSMockServer $ \apns -> withNtfServer t $ testSwitchNotifications servers apns - describe "should switch notifications to the new NTF server" $ - testServerMatrix2 t $ \servers -> + it "should keep sending notifications for old token" $ + withSmpServer t $ + withAPNSMockServer $ \apns -> + withNtfServerOn t ntfTestPort $ + testNotificationsOldToken apns + it "should update server from new token" $ + withSmpServer t $ withAPNSMockServer $ \apns -> withNtfServerOn t ntfTestPort2 . withNtfServerThreadOn t ntfTestPort $ \ntf -> - testSwitchNotificationServer servers apns ntf + testNotificationsNewToken apns ntf testNotificationToken :: APNSMockServer -> IO () testNotificationToken APNSMockServer {apnsQ} = do @@ -280,7 +285,6 @@ testNtfTokenChangeServers t APNSMockServer {apnsQ} = getTestNtfTokenPort a >>= \port -> liftIO $ port `shouldBe` ntfTestPort -- not yet changed -- trigger token replace tkn2 <- registerTestToken a "xyzw" NMInstant apnsQ - -- registerNtfToken a tkn2 NMInstant >>= \r -> liftIO $ r `shouldBe` NTRegistered getTestNtfTokenPort a >>= \port -> liftIO $ port `shouldBe` ntfTestPort -- not yet changed deleteNtfToken a tkn2 -- force server switch Left BROKER {brokerErr = NETWORK} <- tryError $ registerTestToken a "qwer" NMInstant apnsQ -- ok, it's down for now @@ -649,34 +653,68 @@ testSwitchNotifications servers APNSMockServer {apnsQ} = do disconnectAgentClient a disconnectAgentClient b -testSwitchNotificationServer :: InitialAgentServers -> APNSMockServer -> ThreadId -> IO () -testSwitchNotificationServer servers APNSMockServer {apnsQ} ntf = do - a <- getSMPAgentClient' 1 agentCfg servers testDB - b <- getSMPAgentClient' 2 agentCfg servers testDB2 +testNotificationsOldToken :: APNSMockServer -> IO () +testNotificationsOldToken APNSMockServer {apnsQ} = do + a <- getSMPAgentClient' 1 agentCfg initAgentServers testDB + b <- getSMPAgentClient' 2 agentCfg initAgentServers testDB2 + c <- getSMPAgentClient' 3 agentCfg initAgentServers testDB3 runRight_ $ do - (aId, bId) <- makeConnection a b - exchangeGreetingsMsgId 4 a bId b aId - aTkn <- registerTestToken a "abcd" NMInstant apnsQ - liftIO $ threadDelay 250000 - let testMessage msg = do - msgId <- sendMessage b aId (SMP.MsgFlags True) msg - get b ##> ("", aId, SENT msgId) - void $ messageNotification apnsQ - get a =##> \case ("", c, Msg msg') -> c == bId && msg == msg'; _ -> False - ackMessage a bId msgId Nothing - testMessage "hello" - -- switch over to a new server - setNtfServers a [testNtfServer2] - deleteNtfToken a aTkn + (abId, baId) <- makeConnection a b + let testMessageAB = testMessage_ apnsQ a abId b baId _ <- registerTestToken a "abcd" NMInstant apnsQ - getTestNtfTokenPort a >>= \port2 -> liftIO $ port2 `shouldBe` ntfTestPort2 - liftIO $ killThread ntf -- shut down old server - liftIO $ threadDelay 500000 - testMessage "hello again" + liftIO $ threadDelay 250000 + testMessageAB "hello" + -- change server + setNtfServers a [testNtfServer2] -- server 2 isn't running now, don't use + -- replacing token keeps server + _ <- registerTestToken a "xyzw" NMInstant apnsQ + getTestNtfTokenPort a >>= \port -> liftIO $ port `shouldBe` ntfTestPort + testMessageAB "still there" + -- new connections keep server + (acId, caId) <- makeConnection a c + let testMessageAC = testMessage_ apnsQ a acId c caId + testMessageAC "greetings" disconnectAgentClient a disconnectAgentClient b + disconnectAgentClient c -messageNotification :: TBQueue APNSMockRequest -> ExceptT AgentErrorType IO (C.CbNonce, ByteString) +testNotificationsNewToken :: APNSMockServer -> ThreadId -> IO () +testNotificationsNewToken APNSMockServer {apnsQ} oldNtf = do + a <- getSMPAgentClient' 1 agentCfg initAgentServers testDB + b <- getSMPAgentClient' 2 agentCfg initAgentServers testDB2 + c <- getSMPAgentClient' 3 agentCfg initAgentServers testDB3 + runRight_ $ do + (abId, baId) <- makeConnection a b + let testMessageAB = testMessage_ apnsQ a abId b baId + tkn <- registerTestToken a "abcd" NMInstant apnsQ + getTestNtfTokenPort a >>= \port -> liftIO $ port `shouldBe` ntfTestPort + liftIO $ threadDelay 250000 + testMessageAB "hello" + -- switch + setNtfServers a [testNtfServer2] + deleteNtfToken a tkn + _ <- registerTestToken a "abcd" NMInstant apnsQ + getTestNtfTokenPort a >>= \port -> liftIO $ port `shouldBe` ntfTestPort2 + liftIO $ threadDelay 250000 + liftIO $ killThread oldNtf + -- -- back to work + testMessageAB "hello again" + (acId, caId) <- makeConnection a c + let testMessageAC = testMessage_ apnsQ a acId c caId + testMessageAC "greetings" + disconnectAgentClient a + disconnectAgentClient b + disconnectAgentClient c + +testMessage_ :: HasCallStack => TBQueue APNSMockRequest -> AgentClient -> ConnId -> AgentClient -> ConnId -> SMP.MsgBody -> ExceptT AgentErrorType IO () +testMessage_ apnsQ a aId b bId msg = do + msgId <- sendMessage b aId (SMP.MsgFlags True) msg + get b ##> ("", aId, SENT msgId) + void $ messageNotification apnsQ + get a =##> \case ("", c, Msg msg') -> c == bId && msg == msg'; _ -> False + ackMessage a bId msgId Nothing + +messageNotification :: HasCallStack => TBQueue APNSMockRequest -> ExceptT AgentErrorType IO (C.CbNonce, ByteString) messageNotification apnsQ = do 1000000 `timeout` atomically (readTBQueue apnsQ) >>= \case Nothing -> error "no notification"