mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-03-31 01:05:57 +00:00
fix: support multiple notification servers in configuration (#971)
* tests: add trpl-keeps-server check * add smp server switch check * add connection test and fix withNtfServer * Update src/Simplex/Messaging/Agent/NtfSubSupervisor.hs Co-authored-by: Evgeny Poberezkin <e.poberezkin@me.com> * use ntfServer from token * rename --------- Co-authored-by: Evgeny Poberezkin <e.poberezkin@me.com> Co-authored-by: Evgeny Poberezkin <evgeny@poberezkin.com>
This commit is contained in:
committed by
GitHub
parent
8ff89c19dc
commit
f7cdec2f08
@@ -72,7 +72,7 @@ processNtfSub c (connId, cmd) = do
|
||||
logInfo $ "processNtfSub, NSCCreate - a = " <> tshow a
|
||||
case a of
|
||||
Nothing -> do
|
||||
withNtfServer c $ \ntfServer -> do
|
||||
withTokenServer $ \ntfServer -> do
|
||||
case clientNtfCreds of
|
||||
Just ClientNtfCreds {notifierId} -> do
|
||||
let newSub = newNtfSubscription connId smpServer (Just notifierId) ntfServer NASKey
|
||||
@@ -99,7 +99,7 @@ processNtfSub c (connId, cmd) = do
|
||||
| isDeleteNtfSubAction action -> do
|
||||
if ntfSubStatus == NASNew || ntfSubStatus == NASOff || ntfSubStatus == NASDeleted
|
||||
then resetSubscription
|
||||
else withNtfServer c $ \ntfServer -> do
|
||||
else withTokenServer $ \ntfServer -> do
|
||||
withStore' c $ \db -> supervisorUpdateNtfSub db sub {ntfServer} (NtfSubNTFAction NSACreate)
|
||||
void $ getNtfNTFWorker True c ntfServer
|
||||
| otherwise -> case action of
|
||||
@@ -111,7 +111,7 @@ processNtfSub c (connId, cmd) = do
|
||||
void $ getNtfNTFWorker True c subNtfServer
|
||||
resetSubscription :: m ()
|
||||
resetSubscription =
|
||||
withNtfServer c $ \ntfServer -> do
|
||||
withTokenServer $ \ntfServer -> do
|
||||
let sub' = sub {ntfQueueId = Nothing, ntfServer, ntfSubId = Nothing, ntfSubStatus = NASNew}
|
||||
withStore' c $ \db -> supervisorUpdateNtfSub db sub' (NtfSubSMPAction NSASmpKey)
|
||||
void $ getNtfSMPWorker True c smpServer
|
||||
@@ -143,8 +143,8 @@ getNtfSMPWorker hasWork c server = do
|
||||
ws <- asks $ ntfSMPWorkers . ntfSupervisor
|
||||
getAgentWorker "ntf_smp" hasWork c server ws $ runNtfSMPWorker c server
|
||||
|
||||
withNtfServer :: AgentMonad' m => AgentClient -> (NtfServer -> m ()) -> m ()
|
||||
withNtfServer c action = getNtfServer c >>= mapM_ action
|
||||
withTokenServer :: AgentMonad' m => (NtfServer -> m ()) -> m ()
|
||||
withTokenServer action = getNtfToken >>= mapM_ (\NtfToken {ntfServer} -> action ntfServer)
|
||||
|
||||
runNtfWorker :: forall m. AgentMonad m => AgentClient -> NtfServer -> Worker -> m ()
|
||||
runNtfWorker c srv Worker {doWork} = do
|
||||
|
||||
@@ -23,7 +23,7 @@ import qualified Data.ByteString.Base64.URL as U
|
||||
import Data.ByteString.Char8 (ByteString)
|
||||
import Data.Text.Encoding (encodeUtf8)
|
||||
import NtfClient
|
||||
import SMPAgentClient (agentCfg, initAgentServers, initAgentServers2, testDB, testDB2, testNtfServer2)
|
||||
import SMPAgentClient (agentCfg, initAgentServers, initAgentServers2, testDB, testDB2, testDB3, testNtfServer2)
|
||||
import SMPClient (cfg, testPort, testPort2, testStoreLogFile2, withSmpServer, withSmpServerConfigOn, withSmpServerStoreLogOn, xit')
|
||||
import Simplex.Messaging.Agent
|
||||
import Simplex.Messaging.Agent.Client (withStore')
|
||||
@@ -105,11 +105,16 @@ notificationTests t =
|
||||
testServerMatrix2 t $ \servers ->
|
||||
withAPNSMockServer $ \apns ->
|
||||
withNtfServer t $ testSwitchNotifications servers apns
|
||||
describe "should switch notifications to the new NTF server" $
|
||||
testServerMatrix2 t $ \servers ->
|
||||
it "should keep sending notifications for old token" $
|
||||
withSmpServer t $
|
||||
withAPNSMockServer $ \apns ->
|
||||
withNtfServerOn t ntfTestPort $
|
||||
testNotificationsOldToken apns
|
||||
it "should update server from new token" $
|
||||
withSmpServer t $
|
||||
withAPNSMockServer $ \apns ->
|
||||
withNtfServerOn t ntfTestPort2 . withNtfServerThreadOn t ntfTestPort $ \ntf ->
|
||||
testSwitchNotificationServer servers apns ntf
|
||||
testNotificationsNewToken apns ntf
|
||||
|
||||
testNotificationToken :: APNSMockServer -> IO ()
|
||||
testNotificationToken APNSMockServer {apnsQ} = do
|
||||
@@ -280,7 +285,6 @@ testNtfTokenChangeServers t APNSMockServer {apnsQ} =
|
||||
getTestNtfTokenPort a >>= \port -> liftIO $ port `shouldBe` ntfTestPort -- not yet changed
|
||||
-- trigger token replace
|
||||
tkn2 <- registerTestToken a "xyzw" NMInstant apnsQ
|
||||
-- registerNtfToken a tkn2 NMInstant >>= \r -> liftIO $ r `shouldBe` NTRegistered
|
||||
getTestNtfTokenPort a >>= \port -> liftIO $ port `shouldBe` ntfTestPort -- not yet changed
|
||||
deleteNtfToken a tkn2 -- force server switch
|
||||
Left BROKER {brokerErr = NETWORK} <- tryError $ registerTestToken a "qwer" NMInstant apnsQ -- ok, it's down for now
|
||||
@@ -649,34 +653,68 @@ testSwitchNotifications servers APNSMockServer {apnsQ} = do
|
||||
disconnectAgentClient a
|
||||
disconnectAgentClient b
|
||||
|
||||
testSwitchNotificationServer :: InitialAgentServers -> APNSMockServer -> ThreadId -> IO ()
|
||||
testSwitchNotificationServer servers APNSMockServer {apnsQ} ntf = do
|
||||
a <- getSMPAgentClient' 1 agentCfg servers testDB
|
||||
b <- getSMPAgentClient' 2 agentCfg servers testDB2
|
||||
testNotificationsOldToken :: APNSMockServer -> IO ()
|
||||
testNotificationsOldToken APNSMockServer {apnsQ} = do
|
||||
a <- getSMPAgentClient' 1 agentCfg initAgentServers testDB
|
||||
b <- getSMPAgentClient' 2 agentCfg initAgentServers testDB2
|
||||
c <- getSMPAgentClient' 3 agentCfg initAgentServers testDB3
|
||||
runRight_ $ do
|
||||
(aId, bId) <- makeConnection a b
|
||||
exchangeGreetingsMsgId 4 a bId b aId
|
||||
aTkn <- registerTestToken a "abcd" NMInstant apnsQ
|
||||
liftIO $ threadDelay 250000
|
||||
let testMessage msg = do
|
||||
msgId <- sendMessage b aId (SMP.MsgFlags True) msg
|
||||
get b ##> ("", aId, SENT msgId)
|
||||
void $ messageNotification apnsQ
|
||||
get a =##> \case ("", c, Msg msg') -> c == bId && msg == msg'; _ -> False
|
||||
ackMessage a bId msgId Nothing
|
||||
testMessage "hello"
|
||||
-- switch over to a new server
|
||||
setNtfServers a [testNtfServer2]
|
||||
deleteNtfToken a aTkn
|
||||
(abId, baId) <- makeConnection a b
|
||||
let testMessageAB = testMessage_ apnsQ a abId b baId
|
||||
_ <- registerTestToken a "abcd" NMInstant apnsQ
|
||||
getTestNtfTokenPort a >>= \port2 -> liftIO $ port2 `shouldBe` ntfTestPort2
|
||||
liftIO $ killThread ntf -- shut down old server
|
||||
liftIO $ threadDelay 500000
|
||||
testMessage "hello again"
|
||||
liftIO $ threadDelay 250000
|
||||
testMessageAB "hello"
|
||||
-- change server
|
||||
setNtfServers a [testNtfServer2] -- server 2 isn't running now, don't use
|
||||
-- replacing token keeps server
|
||||
_ <- registerTestToken a "xyzw" NMInstant apnsQ
|
||||
getTestNtfTokenPort a >>= \port -> liftIO $ port `shouldBe` ntfTestPort
|
||||
testMessageAB "still there"
|
||||
-- new connections keep server
|
||||
(acId, caId) <- makeConnection a c
|
||||
let testMessageAC = testMessage_ apnsQ a acId c caId
|
||||
testMessageAC "greetings"
|
||||
disconnectAgentClient a
|
||||
disconnectAgentClient b
|
||||
disconnectAgentClient c
|
||||
|
||||
messageNotification :: TBQueue APNSMockRequest -> ExceptT AgentErrorType IO (C.CbNonce, ByteString)
|
||||
testNotificationsNewToken :: APNSMockServer -> ThreadId -> IO ()
|
||||
testNotificationsNewToken APNSMockServer {apnsQ} oldNtf = do
|
||||
a <- getSMPAgentClient' 1 agentCfg initAgentServers testDB
|
||||
b <- getSMPAgentClient' 2 agentCfg initAgentServers testDB2
|
||||
c <- getSMPAgentClient' 3 agentCfg initAgentServers testDB3
|
||||
runRight_ $ do
|
||||
(abId, baId) <- makeConnection a b
|
||||
let testMessageAB = testMessage_ apnsQ a abId b baId
|
||||
tkn <- registerTestToken a "abcd" NMInstant apnsQ
|
||||
getTestNtfTokenPort a >>= \port -> liftIO $ port `shouldBe` ntfTestPort
|
||||
liftIO $ threadDelay 250000
|
||||
testMessageAB "hello"
|
||||
-- switch
|
||||
setNtfServers a [testNtfServer2]
|
||||
deleteNtfToken a tkn
|
||||
_ <- registerTestToken a "abcd" NMInstant apnsQ
|
||||
getTestNtfTokenPort a >>= \port -> liftIO $ port `shouldBe` ntfTestPort2
|
||||
liftIO $ threadDelay 250000
|
||||
liftIO $ killThread oldNtf
|
||||
-- -- back to work
|
||||
testMessageAB "hello again"
|
||||
(acId, caId) <- makeConnection a c
|
||||
let testMessageAC = testMessage_ apnsQ a acId c caId
|
||||
testMessageAC "greetings"
|
||||
disconnectAgentClient a
|
||||
disconnectAgentClient b
|
||||
disconnectAgentClient c
|
||||
|
||||
testMessage_ :: HasCallStack => TBQueue APNSMockRequest -> AgentClient -> ConnId -> AgentClient -> ConnId -> SMP.MsgBody -> ExceptT AgentErrorType IO ()
|
||||
testMessage_ apnsQ a aId b bId msg = do
|
||||
msgId <- sendMessage b aId (SMP.MsgFlags True) msg
|
||||
get b ##> ("", aId, SENT msgId)
|
||||
void $ messageNotification apnsQ
|
||||
get a =##> \case ("", c, Msg msg') -> c == bId && msg == msg'; _ -> False
|
||||
ackMessage a bId msgId Nothing
|
||||
|
||||
messageNotification :: HasCallStack => TBQueue APNSMockRequest -> ExceptT AgentErrorType IO (C.CbNonce, ByteString)
|
||||
messageNotification apnsQ = do
|
||||
1000000 `timeout` atomically (readTBQueue apnsQ) >>= \case
|
||||
Nothing -> error "no notification"
|
||||
|
||||
Reference in New Issue
Block a user