From 3d6ad64d6214dcaa05cd6993b4682bef42ddeb17 Mon Sep 17 00:00:00 2001 From: JRoberts <8711996+jr-simplex@users.noreply.github.com> Date: Mon, 27 Jun 2022 21:54:35 +0400 Subject: [PATCH] ntf: registerNtfToken rework, notification modes (#431) * check mode for new subscriptions * check token inside actions * migration - apple -> apns * wip * register logic, modes * update mode, cron config, verify token changes * refactor * fix test * NTFMODE * server: delete subscriptions on deleteToken * refactor markNtfSubscriptionForDeletion * remove NTFMODE * remove subscriptions when token is deleted * refactor * lint * test * check ntfMode Co-authored-by: Evgeny Poberezkin <2769109+epoberezkin@users.noreply.github.com> --- src/Simplex/Messaging/Agent.hs | 135 +++++++++-------- src/Simplex/Messaging/Agent/Env/SQLite.hs | 5 +- .../Messaging/Agent/NtfSubSupervisor.hs | 139 ++++++++++-------- src/Simplex/Messaging/Agent/Protocol.hs | 7 +- src/Simplex/Messaging/Agent/Store/SQLite.hs | 85 ++++++----- .../Migrations/M20220322_notifications.hs | 2 +- .../Migrations/M20220625_v2_ntf_mode.hs | 2 + .../Store/SQLite/Migrations/agent_schema.sql | 2 +- src/Simplex/Messaging/Notifications/Client.hs | 3 - src/Simplex/Messaging/Notifications/Server.hs | 9 +- .../Messaging/Notifications/Server/Store.hs | 57 +++---- tests/AgentTests/NotificationTests.hs | 76 +++++++++- 12 files changed, 300 insertions(+), 222 deletions(-) diff --git a/src/Simplex/Messaging/Agent.hs b/src/Simplex/Messaging/Agent.hs index 797c469aa..8b33d3176 100644 --- a/src/Simplex/Messaging/Agent.hs +++ b/src/Simplex/Messaging/Agent.hs @@ -55,7 +55,6 @@ module Simplex.Messaging.Agent setNtfServers, registerNtfToken, verifyNtfToken, - enableNtfCron, checkNtfToken, deleteNtfToken, getNtfToken, @@ -83,7 +82,6 @@ import Data.Maybe (isJust) import qualified Data.Text as T import Data.Time.Clock import Data.Time.Clock.System (systemToUTCTime) -import Data.Word (Word16) import qualified Database.SQLite.Simple as DB -- import GHC.Conc (unsafeIOToSTM) import Simplex.Messaging.Agent.Client @@ -99,7 +97,7 @@ import qualified Simplex.Messaging.Crypto.Ratchet as CR import Simplex.Messaging.Encoding import Simplex.Messaging.Encoding.String (StrEncoding (..)) import Simplex.Messaging.Notifications.Client -import Simplex.Messaging.Notifications.Protocol (DeviceToken, NtfRegCode (NtfRegCode), NtfTknStatus (..)) +import Simplex.Messaging.Notifications.Protocol (DeviceToken, NtfRegCode (NtfRegCode), NtfTknStatus (..), NtfTokenId) import Simplex.Messaging.Notifications.Server.Push.APNS (PNMessageData (..)) import Simplex.Messaging.Parsers (parse) import Simplex.Messaging.Protocol (BrokerMsg, ErrorType (AUTH), MsgBody, MsgFlags, SMPMsgMeta) @@ -199,10 +197,6 @@ registerNtfToken c = withAgentEnv c .: registerNtfToken' c verifyNtfToken :: AgentErrorMonad m => AgentClient -> DeviceToken -> ByteString -> C.CbNonce -> m () verifyNtfToken c = withAgentEnv c .:. verifyNtfToken' c --- | Enable/disable periodic notifications -enableNtfCron :: AgentErrorMonad m => AgentClient -> DeviceToken -> Word16 -> m () -enableNtfCron c = withAgentEnv c .: enableNtfCron' c - checkNtfToken :: AgentErrorMonad m => AgentClient -> DeviceToken -> m NtfTknStatus checkNtfToken c = withAgentEnv c . checkNtfToken' c @@ -641,7 +635,7 @@ deleteConnection' c connId = atomically $ removeSubscription c connId withStore' c (`deleteConn` connId) ns <- asks ntfSupervisor - atomically $ sendNtfSubCommand ns (connId, NSCDelete) + atomically $ writeTBQueue (ntfSubQ ns) (connId, NSCDelete) -- | Change servers to be used for creating new queues, in Reader monad setSMPServers' :: AgentMonad m => AgentClient -> NonEmpty SMPServer -> m () @@ -649,31 +643,50 @@ setSMPServers' c servers = do atomically $ writeTVar (smpServers c) servers registerNtfToken' :: forall m. AgentMonad m => AgentClient -> DeviceToken -> NotificationsMode -> m NtfTknStatus -registerNtfToken' c deviceToken ntfMode = - withStore' c (`getDeviceNtfToken` deviceToken) >>= \case - (Just tkn@NtfToken {ntfTokenId, ntfTknStatus, ntfTknAction, ntfMode = _currentNtfMode}, prevTokens) -> do - mapM_ (deleteToken_ c) prevTokens - ns <- asks ntfSupervisor - case (ntfTokenId, ntfTknAction) of - (Nothing, Just NTARegister) -> registerToken tkn $> NTRegistered +registerNtfToken' c suppliedDeviceToken suppliedNtfMode = + withStore' c getSavedNtfToken >>= \case + Just tkn@NtfToken {deviceToken = savedDeviceToken, ntfTokenId, ntfTknStatus, ntfTknAction, ntfMode = savedNtfMode} -> do + status <- case (ntfTokenId, ntfTknAction) of + (Nothing, Just NTARegister) -> do + when (savedDeviceToken /= suppliedDeviceToken) $ withStore' c $ \db -> updateDeviceToken db tkn suppliedDeviceToken + registerToken tkn $> NTRegistered -- TODO minimal time before repeat registration - (Just _, Nothing) -> when (ntfTknStatus == NTRegistered) (registerToken tkn) $> NTRegistered - (Just tknId, Just (NTAVerify code)) -> - t tkn (NTActive, Just NTACheck) $ agentNtfVerifyToken c tknId tkn code - (Just tknId, Just (NTACron interval)) -> - t tkn (cronSuccess interval) $ agentNtfEnableCron c tknId tkn interval - (Just _tknId, Just NTACheck) -> do - if ntfTknStatus == NTActive && ntfMode == NMInstant - then initializeNtfSubQ c tkn - else atomically $ nsUpdateToken ns tkn - pure ntfTknStatus -- TODO - -- agentNtfCheckToken c tknId tkn >>= \case + (Just tknId, Nothing) + | savedDeviceToken == suppliedDeviceToken -> + when (ntfTknStatus == NTRegistered) (registerToken tkn) $> NTRegistered + | otherwise -> replaceToken tknId $> NTRegistered + (Just tknId, Just (NTAVerify code)) + | savedDeviceToken == suppliedDeviceToken -> + t tkn (NTActive, Just NTACheck) $ agentNtfVerifyToken c tknId tkn code + | otherwise -> replaceToken tknId $> NTRegistered + (Just tknId, Just NTACheck) + | savedDeviceToken == suppliedDeviceToken -> do + ns <- asks ntfSupervisor + atomically $ nsUpdateToken ns tkn {ntfMode = suppliedNtfMode} + when (ntfTknStatus == NTActive) $ do + cron <- asks $ ntfCron . config + agentNtfEnableCron c tknId tkn cron + when (suppliedNtfMode == NMInstant) $ initializeNtfSubs c + when (suppliedNtfMode == NMPeriodic && savedNtfMode == NMInstant) $ smpDeleteNtfSubs c + pure ntfTknStatus -- TODO + -- agentNtfCheckToken c tknId tkn >>= \case + | otherwise -> replaceToken tknId $> NTRegistered (Just tknId, Just NTADelete) -> do agentNtfDeleteToken c tknId tkn withStore' c (`removeNtfToken` tkn) + ns <- asks ntfSupervisor atomically $ nsRemoveNtfToken ns pure NTExpired _ -> pure ntfTknStatus + withStore' c $ \db -> updateNtfMode db tkn suppliedNtfMode + pure status + where + replaceToken :: NtfTokenId -> m () + replaceToken tknId = do + agentNtfReplaceToken c tknId tkn suppliedDeviceToken + withStore' c $ \db -> updateDeviceToken db tkn suppliedDeviceToken + ns <- asks ntfSupervisor + atomically $ nsUpdateToken ns tkn {deviceToken = suppliedDeviceToken, ntfTknStatus = NTRegistered, ntfMode = suppliedNtfMode} _ -> getNtfServer c >>= \case Just ntfServer -> @@ -681,7 +694,7 @@ registerNtfToken' c deviceToken ntfMode = C.SignAlg a -> do tknKeys <- liftIO $ C.generateSignatureKeyPair a dhKeys <- liftIO C.generateKeyPair' - let tkn = newNtfToken deviceToken ntfServer tknKeys dhKeys ntfMode + let tkn = newNtfToken suppliedDeviceToken ntfServer tknKeys dhKeys suppliedNtfMode withStore' c (`createNtfToken` tkn) registerToken tkn pure NTRegistered @@ -694,52 +707,52 @@ registerNtfToken' c deviceToken ntfMode = let dhSecret = C.dh' srvPubDhKey privDhKey withStore' c $ \db -> updateNtfTokenRegistration db tkn tknId dhSecret ns <- asks ntfSupervisor - atomically $ nsUpdateToken ns tkn + atomically $ nsUpdateToken ns tkn {deviceToken = suppliedDeviceToken, ntfTknStatus = NTRegistered, ntfMode = suppliedNtfMode} -- TODO decrypt verification code verifyNtfToken' :: AgentMonad m => AgentClient -> DeviceToken -> ByteString -> C.CbNonce -> m () verifyNtfToken' c deviceToken code nonce = - withStore' c (`getDeviceNtfToken` deviceToken) >>= \case - (Just tkn@NtfToken {ntfTokenId = Just tknId, ntfDhSecret = Just dhSecret}, _) -> do + withStore' c getSavedNtfToken >>= \case + Just tkn@NtfToken {deviceToken = savedDeviceToken, ntfTokenId = Just tknId, ntfDhSecret = Just dhSecret, ntfMode} -> do + when (deviceToken /= savedDeviceToken) . throwError $ CMD PROHIBITED code' <- liftEither . bimap cryptoError NtfRegCode $ C.cbDecrypt dhSecret nonce code - void . withToken c tkn (Just (NTConfirmed, NTAVerify code')) (NTActive, Just NTACheck) $ do - agentNtfVerifyToken c tknId tkn code' + toStatus <- + withToken c tkn (Just (NTConfirmed, NTAVerify code')) (NTActive, Just NTACheck) $ + agentNtfVerifyToken c tknId tkn code' + when (toStatus == NTActive) $ do + cron <- asks $ ntfCron . config + agentNtfEnableCron c tknId tkn cron + when (ntfMode == NMInstant) $ initializeNtfSubs c _ -> throwError $ CMD PROHIBITED -enableNtfCron' :: AgentMonad m => AgentClient -> DeviceToken -> Word16 -> m () -enableNtfCron' c deviceToken interval = do - when (interval < 20) . throwError $ CMD PROHIBITED - withStore' c (`getDeviceNtfToken` deviceToken) >>= \case - (Just tkn@NtfToken {ntfTokenId = Just tknId, ntfTknStatus = NTActive}, _) -> - void . withToken c tkn (Just (NTActive, NTACron interval)) (cronSuccess interval) $ - agentNtfEnableCron c tknId tkn interval - _ -> throwError $ CMD PROHIBITED - -cronSuccess :: Word16 -> (NtfTknStatus, Maybe NtfTknAction) -cronSuccess interval - | interval == 0 = (NTActive, Just NTACheck) - | otherwise = (NTActive, Just $ NTACron interval) - checkNtfToken' :: AgentMonad m => AgentClient -> DeviceToken -> m NtfTknStatus checkNtfToken' c deviceToken = - withStore' c (`getDeviceNtfToken` deviceToken) >>= \case - (Just tkn@NtfToken {ntfTokenId = Just tknId}, _) -> agentNtfCheckToken c tknId tkn + withStore' c getSavedNtfToken >>= \case + Just tkn@NtfToken {deviceToken = savedDeviceToken, ntfTokenId = Just tknId} -> do + when (deviceToken /= savedDeviceToken) . throwError $ CMD PROHIBITED + agentNtfCheckToken c tknId tkn _ -> throwError $ CMD PROHIBITED deleteNtfToken' :: AgentMonad m => AgentClient -> DeviceToken -> m () deleteNtfToken' c deviceToken = - withStore' c (`getDeviceNtfToken` deviceToken) >>= \case - (Just tkn, _) -> deleteToken_ c tkn + withStore' c getSavedNtfToken >>= \case + Just tkn@NtfToken {deviceToken = savedDeviceToken} -> do + when (deviceToken /= savedDeviceToken) . throwError $ CMD PROHIBITED + deleteToken_ c tkn + smpDeleteNtfSubs c _ -> throwError $ CMD PROHIBITED getNtfToken' :: AgentMonad m => AgentClient -> m (DeviceToken, NtfTknStatus, NotificationsMode) -getNtfToken' _c = throwError $ CMD PROHIBITED +getNtfToken' c = + withStore' c getSavedNtfToken >>= \case + Just NtfToken {deviceToken, ntfTknStatus, ntfMode} -> pure (deviceToken, ntfTknStatus, ntfMode) + _ -> throwError $ CMD PROHIBITED -- | Delete notification subscription for connection, in Reader monad deleteNtfSub' :: AgentMonad m => AgentClient -> ConnId -> m () deleteNtfSub' _c connId = do ns <- asks ntfSupervisor - atomically $ sendNtfSubCommand ns (connId, NSCDelete) + atomically $ writeTBQueue (ntfSubQ ns) (connId, NSCDelete) deleteToken_ :: AgentMonad m => AgentClient -> NtfToken -> m () deleteToken_ c tkn@NtfToken {ntfTokenId, ntfTknStatus} = do @@ -764,9 +777,7 @@ withToken c tkn@NtfToken {deviceToken, ntfMode} from_ (toStatus, toAction_) f = Right _ -> do withStore' c $ \db -> updateNtfToken db tkn toStatus toAction_ let updatedToken = tkn {ntfTknStatus = toStatus, ntfTknAction = toAction_} - if toStatus == NTActive && ntfMode == NMInstant - then initializeNtfSubQ c updatedToken - else atomically $ nsUpdateToken ns updatedToken + atomically $ nsUpdateToken ns updatedToken pure toStatus Left e@(NTF AUTH) -> do withStore' c $ \db -> removeNtfToken db tkn @@ -775,14 +786,18 @@ withToken c tkn@NtfToken {deviceToken, ntfMode} from_ (toStatus, toAction_) f = throwError e Left e -> throwError e -initializeNtfSubQ :: AgentMonad m => AgentClient -> NtfToken -> m () -initializeNtfSubQ c tkn = do +initializeNtfSubs :: AgentMonad m => AgentClient -> m () +initializeNtfSubs c = do ns <- asks ntfSupervisor - connIds <- atomically $ do - nsUpdateToken ns tkn - getSubscriptions c + connIds <- atomically $ getSubscriptions c forM_ connIds $ \connId -> atomically $ sendNtfSubCommand ns (connId, NSCCreate) +smpDeleteNtfSubs :: AgentMonad m => AgentClient -> m () +smpDeleteNtfSubs c = do + ns <- asks ntfSupervisor + connIds <- atomically $ getSubscriptions c + forM_ connIds $ \connId -> atomically $ writeTBQueue (ntfSubQ ns) (connId, NSCSmpDelete) + -- TODO -- There should probably be another function to cancel all subscriptions that would flush the queue first, -- so that supervisor stops processing pending commands? diff --git a/src/Simplex/Messaging/Agent/Env/SQLite.hs b/src/Simplex/Messaging/Agent/Env/SQLite.hs index ce3cc0d2d..f8199221b 100644 --- a/src/Simplex/Messaging/Agent/Env/SQLite.hs +++ b/src/Simplex/Messaging/Agent/Env/SQLite.hs @@ -46,6 +46,7 @@ import Simplex.Messaging.Version import System.Random (StdGen, newStdGen) import UnliftIO (Async) import UnliftIO.STM +import Data.Word (Word16) -- | Agent monad with MonadReader Env and MonadError AgentErrorType type AgentMonad m = (MonadUnliftIO m, MonadReader Env m, MonadError AgentErrorType m) @@ -67,6 +68,7 @@ data AgentConfig = AgentConfig reconnectInterval :: RetryInterval, helloTimeout :: NominalDiffTime, resubscriptionConcurrency :: Int, + ntfCron :: Word16, ntfWorkerThrottle :: Int, ntfSubCheckInterval :: NominalDiffTime, ntfMaxMessages :: Int, @@ -101,6 +103,7 @@ defaultAgentConfig = reconnectInterval = defaultReconnectInterval, helloTimeout = 2 * nominalDay, resubscriptionConcurrency = 16, + ntfCron = 20, -- minutes ntfWorkerThrottle = 1000000, -- microseconds ntfSubCheckInterval = nominalDay, ntfMaxMessages = 4, @@ -138,7 +141,7 @@ data NtfSupervisor = NtfSupervisor ntfSMPWorkers :: TMap SMPServer (TMVar (), Async ()) } -data NtfSupervisorCommand = NSCCreate | NSCDelete | NSCNtfWorker NtfServer | NSCNtfSMPWorker SMPServer +data NtfSupervisorCommand = NSCCreate | NSCDelete | NSCSmpDelete | NSCNtfWorker NtfServer | NSCNtfSMPWorker SMPServer newNtfSubSupervisor :: Natural -> STM NtfSupervisor newNtfSubSupervisor qSize = do diff --git a/src/Simplex/Messaging/Agent/NtfSubSupervisor.hs b/src/Simplex/Messaging/Agent/NtfSubSupervisor.hs index 5e7fd56b8..83bcfbbf2 100644 --- a/src/Simplex/Messaging/Agent/NtfSubSupervisor.hs +++ b/src/Simplex/Messaging/Agent/NtfSubSupervisor.hs @@ -27,7 +27,7 @@ import qualified Data.Map.Strict as M import Data.Time (UTCTime, addUTCTime, diffUTCTime, getCurrentTime, nominalDiffTimeToSeconds) import Simplex.Messaging.Agent.Client import Simplex.Messaging.Agent.Env.SQLite -import Simplex.Messaging.Agent.Protocol (AgentErrorType (..), BrokerErrorType (..), ConnId) +import Simplex.Messaging.Agent.Protocol (AgentErrorType (..), BrokerErrorType (..), ConnId, NotificationsMode (..)) import qualified Simplex.Messaging.Agent.Protocol as AP import Simplex.Messaging.Agent.RetryInterval import Simplex.Messaging.Agent.Store @@ -90,10 +90,16 @@ processNtfSub c (connId, cmd) = do addNtfWorker ntfServer _ -> pure () -- error - notification server not configured NSCDelete -> do - withStore' c (`markNtfSubscriptionForDeletion` connId) + withStore' c $ \db -> markNtfSubscriptionForDeletion db connId (NtfSubAction NSADelete) case ntfServer_ of (Just ntfServer) -> addNtfWorker ntfServer _ -> pure () + NSCSmpDelete -> do + withStore' c (`getRcvQueue` connId) >>= \case + Right RcvQueue {server = smpServer} -> do + withStore' c $ \db -> markNtfSubscriptionForDeletion db connId (NtfSubSMPAction NSASmpDelete) + addNtfSMPWorker smpServer + Left _ -> pure () NSCNtfWorker ntfServer -> addNtfWorker ntfServer NSCNtfSMPWorker smpServer -> @@ -119,57 +125,61 @@ processNtfSub c (connId, cmd) = do runNtfWorker :: forall m. AgentMonad m => AgentClient -> NtfServer -> TMVar () -> m () runNtfWorker c srv doWork = forever $ do void . atomically $ readTMVar doWork - getNtfToken >>= \case - Just tkn -> do - nextSub_ <- withStore' c (`getNextNtfSubAction` srv) - case nextSub_ of - Nothing -> noWorkToDo - Just ntfSub@(NtfSubscription {connId}, _) -> do - ri <- asks $ reconnectInterval . config - withRetryInterval ri $ \loop -> - processAction tkn ntfSub - `catchError` ( \e -> - case e of - BROKER NETWORK -> loop - BROKER TIMEOUT -> loop - _ -> ntfInternalError c connId (show e) - ) - _ -> noWorkToDo + nextSub_ <- withStore' c (`getNextNtfSubAction` srv) + case nextSub_ of + Nothing -> noWorkToDo + Just ntfSub@(NtfSubscription {connId}, _) -> do + ri <- asks $ reconnectInterval . config + withRetryInterval ri $ \loop -> + processAction ntfSub + `catchError` ( \e -> + case e of + BROKER NETWORK -> loop + BROKER TIMEOUT -> loop + _ -> ntfInternalError c connId (show e) + ) throttle <- asks $ ntfWorkerThrottle . config liftIO $ threadDelay throttle where noWorkToDo = void . atomically $ tryTakeTMVar doWork - processAction :: NtfToken -> (NtfSubscription, NtfSubAction) -> m () - processAction tkn@NtfToken {ntfTokenId, ntfTknStatus} (ntfSub@NtfSubscription {connId, smpServer, ntfSubId}, ntfSubAction) = do + processAction :: (NtfSubscription, NtfSubAction) -> m () + processAction (ntfSub@NtfSubscription {connId, smpServer, ntfSubId}, ntfSubAction) = do ts <- liftIO getCurrentTime unlessM (rescheduleAction doWork ts ntfSub) $ case ntfSubAction of - NSACreate -> do - rq_ <- withStore' c (`getRcvQueue` connId) - case (rq_, ntfTknStatus, ntfTokenId) of - (Right RcvQueue {clientNtfCreds = Just ClientNtfCreds {ntfPrivateKey, notifierId}}, NTActive, Just tknId) -> do - nSubId <- agentNtfCreateSubscription c tknId tkn (SMPQueueNtf smpServer notifierId) ntfPrivateKey - let actionTs = addUTCTime 30 ts - withStore' c $ \db -> - updateNtfSubscription db connId ntfSub {ntfSubId = Just nSubId, ntfSubStatus = NASCreated NSNew, ntfSubActionTs = actionTs} (NtfSubAction NSACheck) - _ -> ntfInternalError c connId "NSACreate - no notifier queue credentials or token not active" - NSACheck -> case ntfSubId of - Just nSubId -> - agentNtfCheckSubscription c nSubId tkn >>= \case - NSNew -> updateSubNextCheck ts NSNew - NSPending -> updateSubNextCheck ts NSPending - NSActive -> updateSubNextCheck ts NSActive - NSEnd -> updateSubNextCheck ts NSEnd - NSSMPAuth -> updateSub (NASCreated NSSMPAuth) (NtfSubAction NSADelete) ts - Nothing -> ntfInternalError c connId "NSACheck - no subscription ID" + NSACreate -> + getNtfToken >>= \case + Just tkn@NtfToken {ntfTokenId = Just tknId, ntfTknStatus = NTActive, ntfMode = NMInstant} -> do + RcvQueue {clientNtfCreds} <- withStore c (`getRcvQueue` connId) + case clientNtfCreds of + Just ClientNtfCreds {ntfPrivateKey, notifierId} -> do + nSubId <- agentNtfCreateSubscription c tknId tkn (SMPQueueNtf smpServer notifierId) ntfPrivateKey + let actionTs = addUTCTime 30 ts + withStore' c $ \db -> + updateNtfSubscription db connId ntfSub {ntfSubId = Just nSubId, ntfSubStatus = NASCreated NSNew, ntfSubActionTs = actionTs} (NtfSubAction NSACheck) + _ -> ntfInternalError c connId "NSACreate - no notifier queue credentials" + _ -> ntfInternalError c connId "NSACreate - no active token" + NSACheck -> + getNtfToken >>= \case + Just tkn -> + case ntfSubId of + Just nSubId -> + agentNtfCheckSubscription c nSubId tkn >>= \case + NSNew -> updateSubNextCheck ts NSNew + NSPending -> updateSubNextCheck ts NSPending + NSActive -> updateSubNextCheck ts NSActive + NSEnd -> updateSubNextCheck ts NSEnd + NSSMPAuth -> updateSub (NASCreated NSSMPAuth) (NtfSubAction NSADelete) ts -- TODO re-create subscription? + Nothing -> ntfInternalError c connId "NSACheck - no subscription ID" + _ -> ntfInternalError c connId "NSACheck - no active token" NSADelete -> case ntfSubId of Just nSubId -> - agentNtfDeleteSubscription c nSubId tkn + (getNtfToken >>= \tkn -> forM_ tkn $ agentNtfDeleteSubscription c nSubId) `E.finally` do withStore' c $ \db -> updateNtfSubscription db connId ntfSub {ntfSubId = Nothing, ntfSubStatus = NASOff, ntfSubActionTs = ts} (NtfSubSMPAction NSASmpDelete) ns <- asks ntfSupervisor - atomically $ sendNtfSubCommand ns (connId, NSCNtfSMPWorker smpServer) + atomically $ writeTBQueue (ntfSubQ ns) (connId, NSCNtfSMPWorker smpServer) Nothing -> ntfInternalError c connId "NSADelete - no subscription ID" where updateSubNextCheck ts toStatus = do @@ -183,35 +193,32 @@ runNtfWorker c srv doWork = forever $ do runNtfSMPWorker :: forall m. AgentMonad m => AgentClient -> SMPServer -> TMVar () -> m () runNtfSMPWorker c srv doWork = forever $ do void . atomically $ readTMVar doWork - getNtfToken >>= \case - Just tkn -> do - nextSub_ <- withStore' c (`getNextNtfSubSMPAction` srv) - case nextSub_ of - Nothing -> noWorkToDo - Just ntfSub@(NtfSubscription {connId}, _) -> do - ri <- asks $ reconnectInterval . config - withRetryInterval ri $ \loop -> - processAction tkn ntfSub - `catchError` ( \e -> - case e of - BROKER NETWORK -> loop - BROKER TIMEOUT -> loop - _ -> ntfInternalError c connId (show e) - ) - _ -> noWorkToDo + nextSub_ <- withStore' c (`getNextNtfSubSMPAction` srv) + case nextSub_ of + Nothing -> noWorkToDo + Just ntfSub@(NtfSubscription {connId}, _) -> do + ri <- asks $ reconnectInterval . config + withRetryInterval ri $ \loop -> + processAction ntfSub + `catchError` ( \e -> + case e of + BROKER NETWORK -> loop + BROKER TIMEOUT -> loop + _ -> ntfInternalError c connId (show e) + ) throttle <- asks $ ntfWorkerThrottle . config liftIO $ threadDelay throttle where noWorkToDo = void . atomically $ tryTakeTMVar doWork - processAction :: NtfToken -> (NtfSubscription, NtfSubSMPAction) -> m () - processAction NtfToken {ntfTknStatus} (ntfSub@NtfSubscription {connId, ntfServer}, ntfSubAction) = do + processAction :: (NtfSubscription, NtfSubSMPAction) -> m () + processAction (ntfSub@NtfSubscription {connId, ntfServer}, ntfSubAction) = do ts <- liftIO getCurrentTime unlessM (rescheduleAction doWork ts ntfSub) $ case ntfSubAction of - NSASmpKey -> do - rq_ <- withStore' c (`getRcvQueue` connId) - case (rq_, ntfTknStatus) of - (Right rq, NTActive) -> do + NSASmpKey -> + getNtfToken >>= \case + Just NtfToken {ntfTknStatus = NTActive, ntfMode = NMInstant} -> do + rq <- withStore c (`getRcvQueue` connId) C.SignAlg a <- asks (cmdSignAlg . config) (ntfPublicKey, ntfPrivateKey) <- liftIO $ C.generateSignatureKeyPair a (rcvNtfPubDhKey, rcvNtfPrivDhKey) <- liftIO C.generateKeyPair' @@ -222,7 +229,7 @@ runNtfSMPWorker c srv doWork = forever $ do updateNtfSubscription st connId ntfSub {ntfQueueId = Just notifierId, ntfSubStatus = NASKey, ntfSubActionTs = ts} (NtfSubAction NSACreate) ns <- asks ntfSupervisor atomically $ sendNtfSubCommand ns (connId, NSCNtfWorker ntfServer) - _ -> ntfInternalError c connId "NSASmpKey - no rcv queue or token not active" + _ -> ntfInternalError c connId "NSASmpKey - no active token" NSASmpDelete -> do rq_ <- withStore' c (`getRcvQueue` connId) forM_ rq_ $ \rq -> disableQueueNotifications c rq @@ -263,7 +270,11 @@ nsRemoveNtfToken ns = writeTVar (ntfTkn ns) Nothing sendNtfSubCommand :: NtfSupervisor -> (ConnId, NtfSupervisorCommand) -> STM () sendNtfSubCommand ns cmd = readTVar (ntfTkn ns) - >>= mapM_ (\NtfToken {ntfTknStatus} -> when (ntfTknStatus == NTActive) $ writeTBQueue (ntfSubQ ns) cmd) + >>= mapM_ + ( \NtfToken {ntfTknStatus, ntfMode} -> + when (ntfTknStatus == NTActive && ntfMode == NMInstant) $ + writeTBQueue (ntfSubQ ns) cmd + ) closeNtfSupervisor :: NtfSupervisor -> IO () closeNtfSupervisor ns = do diff --git a/src/Simplex/Messaging/Agent/Protocol.hs b/src/Simplex/Messaging/Agent/Protocol.hs index ebb4ab301..938d75b9e 100644 --- a/src/Simplex/Messaging/Agent/Protocol.hs +++ b/src/Simplex/Messaging/Agent/Protocol.hs @@ -133,7 +133,6 @@ import qualified Simplex.Messaging.Crypto as C import Simplex.Messaging.Crypto.Ratchet (E2ERatchetParams, E2ERatchetParamsUri) import Simplex.Messaging.Encoding import Simplex.Messaging.Encoding.String -import Simplex.Messaging.Notifications.Protocol (NtfTknStatus) import Simplex.Messaging.Parsers import Simplex.Messaging.Protocol ( ErrorType, @@ -231,23 +230,20 @@ data ACommand (p :: AParty) where OK :: ACommand Agent ERR :: AgentErrorType -> ACommand Agent SUSPENDED :: ACommand Agent - NTFMODE :: NtfTknStatus -> NotificationsMode -> ACommand Agent deriving instance Eq (ACommand p) deriving instance Show (ACommand p) -data NotificationsMode = NMOff | NMPeriodic | NMInstant +data NotificationsMode = NMPeriodic | NMInstant deriving (Eq, Show) instance StrEncoding NotificationsMode where strEncode = \case - NMOff -> "OFF" NMPeriodic -> "PERIODIC" NMInstant -> "INSTANT" strP = A.takeTill (== ' ') >>= \case - "OFF" -> pure NMOff "PERIODIC" -> pure NMPeriodic "INSTANT" -> pure NMInstant _ -> fail "bad NotificationsMode" @@ -948,7 +944,6 @@ serializeCommand = \case ERR e -> "ERR " <> strEncode e OK -> "OK" SUSPENDED -> "SUSPENDED" - NTFMODE t m -> "NTFMODE " <> smpEncode t <> " " <> strEncode m where showTs :: UTCTime -> ByteString showTs = B.pack . formatISO8601Millis diff --git a/src/Simplex/Messaging/Agent/Store/SQLite.hs b/src/Simplex/Messaging/Agent/Store/SQLite.hs index e7dbb257a..06b1f5c0d 100644 --- a/src/Simplex/Messaging/Agent/Store/SQLite.hs +++ b/src/Simplex/Messaging/Agent/Store/SQLite.hs @@ -66,10 +66,11 @@ module Simplex.Messaging.Agent.Store.SQLite updateRatchet, -- Notification device token persistence createNtfToken, - getDeviceNtfToken, + getSavedNtfToken, updateNtfTokenRegistration, + updateDeviceToken, + updateNtfMode, updateNtfToken, - setNtfTokenNtfMode, removeNtfToken, -- Notification subscription persistence getNtfSubscription, @@ -96,12 +97,12 @@ import Control.Concurrent (threadDelay) import Control.Concurrent.STM (stateTVar) import Control.Monad.Except import Crypto.Random (ChaChaDRG, randomBytesGenerate) -import Data.Bifunctor (first, second) +import Data.Bifunctor (second) import Data.ByteString (ByteString) import qualified Data.ByteString.Base64.URL as U import Data.Char (toLower) import Data.Functor (($>)) -import Data.List (find, foldl', partition) +import Data.List (find, foldl') import qualified Data.Map.Strict as M import Data.Maybe (fromMaybe, listToMaybe) import Data.Text (Text) @@ -122,7 +123,7 @@ import qualified Simplex.Messaging.Crypto as C import Simplex.Messaging.Crypto.Ratchet (RatchetX448, SkippedMsgDiff (..), SkippedMsgKeys) import Simplex.Messaging.Encoding import Simplex.Messaging.Encoding.String -import Simplex.Messaging.Notifications.Client (NtfAgentSubStatus (NASDeleted), NtfServer, NtfSubAction (NSADelete), NtfSubOrSMPAction (..), NtfSubSMPAction, NtfSubscription (..), NtfTknAction, NtfToken (..)) +import Simplex.Messaging.Notifications.Client (NtfAgentSubStatus (..), NtfServer, NtfSubAction (..), NtfSubOrSMPAction (..), NtfSubSMPAction (..), NtfSubscription (..), NtfTknAction (..), NtfToken (..)) import Simplex.Messaging.Notifications.Protocol (DeviceToken (..), NtfSubscriptionId, NtfTknStatus (..), NtfTokenId, SMPQueueNtf (..)) import Simplex.Messaging.Parsers (blobFieldParser, fromTextField_) import Simplex.Messaging.Protocol (MsgBody, MsgFlags, ProtocolServer (..), RcvNtfDhSecret) @@ -634,25 +635,23 @@ createNtfToken db NtfToken {deviceToken = DeviceToken provider token, ntfServer |] ((provider, token, host, port, ntfTokenId, ntfPubKey, ntfPrivKey, ntfDhPubKey, ntfDhPrivKey, ntfDhSecret) :. (ntfTknStatus, ntfTknAction, ntfMode)) -getDeviceNtfToken :: DB.Connection -> DeviceToken -> IO (Maybe NtfToken, [NtfToken]) -getDeviceNtfToken db t = do - tokens <- - map ntfToken - <$> DB.query_ - db - [sql| - SELECT s.ntf_host, s.ntf_port, s.ntf_key_hash, - t.provider, t.device_token, t.tkn_id, t.tkn_pub_key, t.tkn_priv_key, t.tkn_pub_dh_key, t.tkn_priv_dh_key, t.tkn_dh_secret, - t.tkn_status, t.tkn_action, t.ntf_mode - FROM ntf_tokens t - JOIN ntf_servers s USING (ntf_host, ntf_port) - |] - pure . first listToMaybe $ partition ((t ==) . deviceToken) tokens +getSavedNtfToken :: DB.Connection -> IO (Maybe NtfToken) +getSavedNtfToken db = do + maybeFirstRow ntfToken $ + DB.query_ + db + [sql| + SELECT s.ntf_host, s.ntf_port, s.ntf_key_hash, + t.provider, t.device_token, t.tkn_id, t.tkn_pub_key, t.tkn_priv_key, t.tkn_pub_dh_key, t.tkn_priv_dh_key, t.tkn_dh_secret, + t.tkn_status, t.tkn_action, t.ntf_mode + FROM ntf_tokens t + JOIN ntf_servers s USING (ntf_host, ntf_port) + |] where ntfToken ((host, port, keyHash) :. (provider, dt, ntfTokenId, ntfPubKey, ntfPrivKey, ntfDhPubKey, ntfDhPrivKey, ntfDhSecret) :. (ntfTknStatus, ntfTknAction, ntfMode_)) = let ntfServer = ProtocolServer {host, port, keyHash} ntfDhKeys = (ntfDhPubKey, ntfDhPrivKey) - ntfMode = fromMaybe NMOff ntfMode_ + ntfMode = fromMaybe NMPeriodic ntfMode_ in NtfToken {deviceToken = DeviceToken provider dt, ntfServer, ntfTokenId, ntfPubKey, ntfPrivKey, ntfDhKeys, ntfDhSecret, ntfTknStatus, ntfTknAction, ntfMode} updateNtfTokenRegistration :: DB.Connection -> NtfToken -> NtfTokenId -> C.DhSecretX25519 -> IO () @@ -667,6 +666,30 @@ updateNtfTokenRegistration db NtfToken {deviceToken = DeviceToken provider token |] (tknId, ntfDhSecret, NTRegistered, Nothing :: Maybe NtfTknAction, updatedAt, provider, token, host, port) +updateDeviceToken :: DB.Connection -> NtfToken -> DeviceToken -> IO () +updateDeviceToken db NtfToken {deviceToken = DeviceToken provider token, ntfServer = ProtocolServer {host, port}} (DeviceToken toProvider toToken) = do + updatedAt <- getCurrentTime + DB.execute + db + [sql| + UPDATE ntf_tokens + SET provider = ?, device_token = ?, tkn_status = ?, tkn_action = ?, updated_at = ? + WHERE provider = ? AND device_token = ? AND ntf_host = ? AND ntf_port = ? + |] + (toProvider, toToken, NTRegistered, Nothing :: Maybe NtfTknAction, updatedAt, provider, token, host, port) + +updateNtfMode :: DB.Connection -> NtfToken -> NotificationsMode -> IO () +updateNtfMode db NtfToken {deviceToken = DeviceToken provider token, ntfServer = ProtocolServer {host, port}} ntfMode = do + updatedAt <- getCurrentTime + DB.execute + db + [sql| + UPDATE ntf_tokens + SET ntf_mode = ?, updated_at = ? + WHERE provider = ? AND device_token = ? AND ntf_host = ? AND ntf_port = ? + |] + (ntfMode, updatedAt, provider, token, host, port) + updateNtfToken :: DB.Connection -> NtfToken -> NtfTknStatus -> Maybe NtfTknAction -> IO () updateNtfToken db NtfToken {deviceToken = DeviceToken provider token, ntfServer = ProtocolServer {host, port}} tknStatus tknAction = do updatedAt <- getCurrentTime @@ -679,18 +702,6 @@ updateNtfToken db NtfToken {deviceToken = DeviceToken provider token, ntfServer |] (tknStatus, tknAction, updatedAt, provider, token, host, port) -setNtfTokenNtfMode :: DB.Connection -> NtfToken -> NotificationsMode -> IO () -setNtfTokenNtfMode db NtfToken {deviceToken = DeviceToken provider token, ntfServer = ProtocolServer {host, port}} ntfMode = do - updatedAt <- getCurrentTime - DB.execute - db - [sql| - UPDATE ntf_tokens - SET ntf_mode = ?, updated_at = ? - WHERE provider = ? AND device_token = ? AND ntf_host = ? AND ntf_port = ? - |] - (ntfMode, updatedAt, provider, token, host, port) - removeNtfToken :: DB.Connection -> NtfToken -> IO () removeNtfToken db NtfToken {deviceToken = DeviceToken provider token, ntfServer = ProtocolServer {host, port}} = DB.execute @@ -737,8 +748,8 @@ createNtfSubscription db NtfSubscription {connId, smpServer = (SMPServer host po where (ntfSubAction, ntfSubSMPAction) = ntfSubAndSMPAction ntfAction -markNtfSubscriptionForDeletion :: DB.Connection -> ConnId -> IO () -markNtfSubscriptionForDeletion db connId = do +markNtfSubscriptionForDeletion :: DB.Connection -> ConnId -> NtfSubOrSMPAction -> IO () +markNtfSubscriptionForDeletion db connId ntfAction = do updatedAt <- getCurrentTime DB.execute db @@ -747,7 +758,9 @@ markNtfSubscriptionForDeletion db connId = do SET ntf_sub_action = ?, ntf_sub_smp_action = ?, ntf_sub_action_ts = ?, updated_by_supervisor = ?, updated_at = ? WHERE conn_id = ? |] - (Just NSADelete, Nothing :: Maybe NtfSubSMPAction, updatedAt, True, updatedAt, connId) + (ntfSubAction, ntfSubSMPAction, updatedAt, True, updatedAt, connId) + where + (ntfSubAction, ntfSubSMPAction) = ntfSubAndSMPAction ntfAction updateNtfSubscription :: DB.Connection -> ConnId -> NtfSubscription -> NtfSubOrSMPAction -> IO () updateNtfSubscription db connId NtfSubscription {ntfQueueId, ntfSubId, ntfSubStatus, ntfSubActionTs} ntfAction = do @@ -872,7 +885,7 @@ getActiveNtfToken db = ntfToken ((host, port, keyHash) :. (provider, dt, ntfTokenId, ntfPubKey, ntfPrivKey, ntfDhPubKey, ntfDhPrivKey, ntfDhSecret) :. (ntfTknStatus, ntfTknAction, ntfMode_)) = let ntfServer = ProtocolServer {host, port, keyHash} ntfDhKeys = (ntfDhPubKey, ntfDhPrivKey) - ntfMode = fromMaybe NMOff ntfMode_ + ntfMode = fromMaybe NMPeriodic ntfMode_ in NtfToken {deviceToken = DeviceToken provider dt, ntfServer, ntfTokenId, ntfPubKey, ntfPrivKey, ntfDhKeys, ntfDhSecret, ntfTknStatus, ntfTknAction, ntfMode} getNtfRcvQueue :: DB.Connection -> SMPQueueNtf -> IO (Either StoreError (ConnId, RcvNtfDhSecret)) diff --git a/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/M20220322_notifications.hs b/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/M20220322_notifications.hs index 4ee6695d4..3ff01fd87 100644 --- a/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/M20220322_notifications.hs +++ b/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/M20220322_notifications.hs @@ -18,7 +18,7 @@ CREATE TABLE ntf_servers ( ) WITHOUT ROWID; CREATE TABLE ntf_tokens ( - provider TEXT NOT NULL, -- apn + provider TEXT NOT NULL, -- apns device_token TEXT NOT NULL, -- ! this field is mislabeled and is actually saved as binary ntf_host TEXT NOT NULL, ntf_port TEXT NOT NULL, diff --git a/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/M20220625_v2_ntf_mode.hs b/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/M20220625_v2_ntf_mode.hs index 1ba921be8..d0a82a28f 100644 --- a/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/M20220625_v2_ntf_mode.hs +++ b/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/M20220625_v2_ntf_mode.hs @@ -9,4 +9,6 @@ m20220625_v2_ntf_mode :: Query m20220625_v2_ntf_mode = [sql| ALTER TABLE ntf_tokens ADD COLUMN ntf_mode TEXT NULL; + +DELETE FROM ntf_tokens; |] diff --git a/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/agent_schema.sql b/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/agent_schema.sql index daf7cd724..bda5853e2 100644 --- a/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/agent_schema.sql +++ b/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/agent_schema.sql @@ -151,7 +151,7 @@ CREATE TABLE ntf_servers( PRIMARY KEY(ntf_host, ntf_port) ) WITHOUT ROWID; CREATE TABLE ntf_tokens( - provider TEXT NOT NULL, -- apn + provider TEXT NOT NULL, -- apns device_token TEXT NOT NULL, -- ! this field is mislabeled and is actually saved as binary ntf_host TEXT NOT NULL, ntf_port TEXT NOT NULL, diff --git a/src/Simplex/Messaging/Notifications/Client.hs b/src/Simplex/Messaging/Notifications/Client.hs index d73abdfb1..2432687b2 100644 --- a/src/Simplex/Messaging/Notifications/Client.hs +++ b/src/Simplex/Messaging/Notifications/Client.hs @@ -80,7 +80,6 @@ data NtfTknAction = NTARegister | NTAVerify NtfRegCode -- code to verify token | NTACheck - | NTACron Word16 | NTADelete deriving (Show) @@ -89,14 +88,12 @@ instance Encoding NtfTknAction where NTARegister -> "R" NTAVerify code -> smpEncode ('V', code) NTACheck -> "C" - NTACron interval -> smpEncode ('I', interval) NTADelete -> "D" smpP = A.anyChar >>= \case 'R' -> pure NTARegister 'V' -> NTAVerify <$> smpP 'C' -> pure NTACheck - 'I' -> NTACron <$> smpP 'D' -> pure NTADelete _ -> fail "bad NtfTknAction" diff --git a/src/Simplex/Messaging/Notifications/Server.hs b/src/Simplex/Messaging/Notifications/Server.hs index e832af5c2..fbae87dc5 100644 --- a/src/Simplex/Messaging/Notifications/Server.hs +++ b/src/Simplex/Messaging/Notifications/Server.hs @@ -319,7 +319,9 @@ client NtfServerClient {rcvQ, sndQ} NtfSubscriber {newSubQ, smpAgent = ca} NtfPu TDEL -> do logDebug "TDEL" st <- asks store - atomically $ deleteNtfToken st tknId + qs <- atomically $ deleteNtfToken st tknId + forM_ qs $ \SMPQueueNtf {smpServer, notifierId} -> + atomically $ removeSubscription ca smpServer (SPNotifier, notifierId) cancelInvervalNotifications tknId pure NROk TCRN 0 -> do @@ -371,9 +373,8 @@ client NtfServerClient {rcvQ, sndQ} NtfSubscriber {newSubQ, smpAgent = ca} NtfPu SDEL -> do logDebug "SDEL" st <- asks store - atomically $ do - deleteNtfSubscription st subId - removeSubscription ca smpServer (SPNotifier, notifierId) + atomically $ deleteNtfSubscription st subId + atomically $ removeSubscription ca smpServer (SPNotifier, notifierId) pure NROk PING -> pure NRPong getId :: m NtfEntityId diff --git a/src/Simplex/Messaging/Notifications/Server/Store.hs b/src/Simplex/Messaging/Notifications/Server/Store.hs index 018608658..2d063d93a 100644 --- a/src/Simplex/Messaging/Notifications/Server/Store.hs +++ b/src/Simplex/Messaging/Notifications/Server/Store.hs @@ -12,7 +12,9 @@ module Simplex.Messaging.Notifications.Server.Store where import Control.Concurrent.STM import Control.Monad import Data.ByteString.Char8 (ByteString) +import Data.Functor (($>)) import qualified Data.Map.Strict as M +import Data.Maybe (catMaybes) import Data.Set (Set) import qualified Data.Set as S import qualified Simplex.Messaging.Crypto as C @@ -121,8 +123,7 @@ removeTokenRegistration st NtfTknData {ntfTknId = tId, token, tknVerifyKey} = >>= mapM_ (\tId' -> when (tId == tId') $ TM.delete k regs) k = C.toPubKey C.pubKeyBytes tknVerifyKey --- TODO delete token subscriptions -deleteNtfToken :: NtfStore -> NtfTokenId -> STM () +deleteNtfToken :: NtfStore -> NtfTokenId -> STM [SMPQueueNtf] deleteNtfToken st tknId = do TM.lookupDelete tknId (tokens st) >>= mapM_ @@ -134,6 +135,22 @@ deleteNtfToken st tknId = do whenM (TM.null tIds) $ TM.delete token regs ) ) + + qs <- + TM.lookupDelete tknId (tokenSubscriptions st) + >>= mapM + ( readTVar + >=> mapM + ( \subId -> do + TM.lookupDelete subId (subscriptions st) + >>= mapM + ( \NtfSubData {smpQueue} -> + TM.delete smpQueue (subscriptionLookup st) $> smpQueue + ) + ) + . S.toList + ) + pure $ maybe [] catMaybes qs where regs = tokenRegistrations st regKey = C.toPubKey C.pubKeyBytes @@ -187,39 +204,3 @@ deleteNtfSubscription st subId = do ts_ <- TM.lookup tokenId (tokenSubscriptions st) forM_ ts_ $ \ts -> modifyTVar' ts $ S.delete subId ) - --- getNtfRec :: NtfStore -> SNtfEntity e -> NtfEntityId -> STM (Maybe (NtfEntityRec e)) --- getNtfRec st ent entId = case ent of --- SToken -> NtfTkn <$$> TM.lookup entId (tokens st) --- SSubscription -> pure Nothing - --- getNtfVerifyKey :: NtfStore -> SNtfEntity e -> NtfEntityId -> STM (Maybe (NtfEntityRec e, C.APublicVerifyKey)) --- getNtfVerifyKey st ent entId = --- getNtfRec st ent entId >>= \case --- Just r@(NtfTkn NtfTknData {tknVerifyKey}) -> pure $ Just (r, tknVerifyKey) --- Just r@(NtfSub NtfSubData {tokenId}) -> --- getNtfRec st SToken tokenId >>= \case --- Just (NtfTkn NtfTknData {tknVerifyKey}) -> pure $ Just (r, tknVerifyKey) --- _ -> pure Nothing --- _ -> pure Nothing - --- mkNtfSubsciption :: SMPQueueNtf -> NtfTokenId -> STM NtfSubsciption --- mkNtfSubsciption smpQueue tokenId = do --- subStatus <- newTVar NSNew --- pure NtfSubsciption {smpQueue, tokenId, subStatus} - --- getNtfSub :: NtfSubscriptionsStore -> NtfSubsciptionId -> STM (Maybe NtfSubsciption) --- getNtfSub st subId = pure Nothing -- maybe (pure $ Left AUTH) (fmap Right . readTVar) . M.lookup subId . subscriptions =<< readTVar st - --- getNtfSubViaSMPQueue :: NtfSubscriptionsStore -> SMPQueueNtf -> STM (Maybe NtfSubsciption) --- getNtfSubViaSMPQueue st smpQueue = pure Nothing - --- -- replace keeping status --- updateNtfSub :: NtfSubscriptionsStore -> NtfSubsciption -> SMPQueueNtf -> NtfTokenId -> C.DhSecretX25519 -> STM (Maybe ()) --- updateNtfSub st sub smpQueue tokenId dhSecret = pure Nothing - --- addNtfSub :: NtfSubscriptionsStore -> NtfSubsciptionId -> NtfSubsciption -> STM (Maybe ()) --- addNtfSub st subId sub = pure Nothing - --- deleteNtfSub :: NtfSubscriptionsStore -> NtfSubsciptionId -> STM () --- deleteNtfSub st subId = pure () diff --git a/tests/AgentTests/NotificationTests.hs b/tests/AgentTests/NotificationTests.hs index 99ca7d8e5..81b78ace1 100644 --- a/tests/AgentTests/NotificationTests.hs +++ b/tests/AgentTests/NotificationTests.hs @@ -31,13 +31,18 @@ import Simplex.Messaging.Protocol (ErrorType (AUTH), MsgFlags (MsgFlags), SMPMsg import qualified Simplex.Messaging.Protocol as SMP import Simplex.Messaging.Transport (ATransport) import Simplex.Messaging.Util (tryE) -import System.Directory (removeFile) +import System.Directory (doesFileExist, removeFile) import Test.Hspec import UnliftIO +removeFileIfExists :: FilePath -> IO () +removeFileIfExists filePath = do + fileExists <- doesFileExist filePath + when fileExists $ removeFile filePath + notificationTests :: ATransport -> Spec notificationTests t = - after_ (removeFile testDB) $ do + after_ (removeFile testDB >> removeFileIfExists testDB2) $ do describe "Managing notification tokens" $ do it "should register and verify notification token" $ withAPNSMockServer $ \apns -> @@ -60,6 +65,10 @@ notificationTests t = withSmpServer t $ withAPNSMockServer $ \apns -> withNtfServer t $ testNotificationSubscriptionNewConnection apns + it "should change notifications mode" $ \_ -> + withSmpServer t $ + withAPNSMockServer $ \apns -> + withNtfServer t $ testChangeNotificationsMode apns testNotificationToken :: APNSMockServer -> IO () testNotificationToken APNSMockServer {apnsQ} = do @@ -73,7 +82,6 @@ testNotificationToken APNSMockServer {apnsQ} = do nonce <- C.cbNonce <$> ntfData .-> "nonce" liftIO $ sendApnsResponse APNSRespOk verifyNtfToken a tkn verification nonce - enableNtfCron a tkn 30 NTActive <- checkNtfToken a tkn deleteNtfToken a tkn -- agent deleted this token @@ -110,7 +118,6 @@ testNtfTokenRepeatRegistration APNSMockServer {apnsQ} = do liftIO $ sendApnsResponse' APNSRespOk -- can still use the first verification code, it is the same after decryption verifyNtfToken a tkn verification nonce - enableNtfCron a tkn 30 NTActive <- checkNtfToken a tkn pure () pure () @@ -149,7 +156,6 @@ testNtfTokenSecondRegistration APNSMockServer {apnsQ} = do Left (NTF AUTH) <- tryE $ checkNtfToken a tkn -- and the second is active NTActive <- checkNtfToken a' tkn - enableNtfCron a' tkn 30 pure () pure () @@ -180,7 +186,7 @@ testNtfTokenServerRestart t APNSMockServer {apnsQ} = do liftIO $ sendApnsResponse' APNSRespOk verifyNtfToken a' tkn verification' nonce' NTActive <- checkNtfToken a' tkn - enableNtfCron a' tkn 30 + pure () pure () testNotificationSubscriptionExistingConnection :: APNSMockServer -> IO () @@ -248,7 +254,7 @@ testNotificationSubscriptionNewConnection APNSMockServer {apnsQ} = do Right () <- runExceptT $ do -- alice registers notification token let aliceTkn = DeviceToken PPApns "abcd" - NTRegistered <- registerNtfToken alice aliceTkn NMPeriodic + NTRegistered <- registerNtfToken alice aliceTkn NMInstant APNSMockRequest {notification = APNSNotification {aps = APNSBackground _, notificationData = Just ntfData}, sendApnsResponse} <- atomically $ readTBQueue apnsQ verification <- ntfData .-> "verification" @@ -258,7 +264,7 @@ testNotificationSubscriptionNewConnection APNSMockServer {apnsQ} = do NTActive <- checkNtfToken alice aliceTkn -- bob registers notification token let bobTkn = DeviceToken PPApns "bcde" - NTRegistered <- registerNtfToken bob bobTkn NMPeriodic + NTRegistered <- registerNtfToken bob bobTkn NMInstant APNSMockRequest {notification = APNSNotification {aps = APNSBackground _, notificationData = Just ntfData'}, sendApnsResponse = sendApnsResponse'} <- atomically $ readTBQueue apnsQ verification' <- ntfData' .-> "verification" @@ -302,6 +308,60 @@ testNotificationSubscriptionNewConnection APNSMockServer {apnsQ} = do baseId = 3 msgId = subtract baseId +testChangeNotificationsMode :: APNSMockServer -> IO () +testChangeNotificationsMode APNSMockServer {apnsQ} = do + alice <- getSMPAgentClient agentCfg initAgentServers + bob <- getSMPAgentClient agentCfg {dbFile = testDB2} initAgentServers + Right () <- runExceptT $ do + -- establish connection + (bobId, qInfo) <- createConnection alice SCMInvitation + aliceId <- joinConnection bob qInfo "bob's connInfo" + ("", _, CONF confId "bob's connInfo") <- get alice + allowConnection alice bobId confId "alice's connInfo" + get bob ##> ("", aliceId, INFO "alice's connInfo") + get alice ##> ("", bobId, CON) + get bob ##> ("", aliceId, CON) + -- register notification token, set mode to NMPeriodic + let tkn = DeviceToken PPApns "abcd" + NTRegistered <- registerNtfToken alice tkn NMPeriodic + APNSMockRequest {notification = APNSNotification {aps = APNSBackground _, notificationData = Just ntfData}, sendApnsResponse} <- + atomically $ readTBQueue apnsQ + verification <- ntfData .-> "verification" + verificationNonce <- C.cbNonce <$> ntfData .-> "nonce" + liftIO $ sendApnsResponse APNSRespOk + verifyNtfToken alice tkn verification verificationNonce + NTActive <- checkNtfToken alice tkn + -- send message, no notification + 1 <- msgId <$> sendMessage bob aliceId (SMP.MsgFlags True) "hello" + get bob ##> ("", aliceId, SENT $ baseId + 1) + get alice =##> \case ("", c, Msg "hello") -> c == bobId; _ -> False + ackMessage alice bobId $ baseId + 1 + -- set mode to NMInstant + NTActive <- registerNtfToken alice tkn NMInstant + -- send message, receive notification + liftIO $ threadDelay 500000 + 2 <- msgId <$> sendMessage bob aliceId (SMP.MsgFlags True) "hello again" + get bob ##> ("", aliceId, SENT $ baseId + 2) + void $ messageNotification apnsQ + get alice =##> \case ("", c, Msg "hello again") -> c == bobId; _ -> False + ackMessage alice bobId $ baseId + 2 + -- reset mode to NMPeriodic + NTActive <- registerNtfToken alice tkn NMPeriodic + -- send message, no notification + liftIO $ threadDelay 500000 + 3 <- msgId <$> sendMessage bob aliceId (SMP.MsgFlags True) "hello there" + get bob ##> ("", aliceId, SENT $ baseId + 3) + get alice =##> \case ("", c, Msg "hello there") -> c == bobId; _ -> False + ackMessage alice bobId $ baseId + 3 + -- no notifications should follow + 500000 `timeout` atomically (readTBQueue apnsQ) >>= \case + Nothing -> pure () + _ -> error "unexpected notification" + pure () + where + baseId = 3 + msgId = subtract baseId + messageNotification :: TBQueue APNSMockRequest -> ExceptT AgentErrorType IO (C.CbNonce, ByteString) messageNotification apnsQ = do 500000 `timeout` atomically (readTBQueue apnsQ) >>= \case