mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-05-16 18:05:21 +00:00
ntf: registerNtfToken rework, notification modes (#431)
* check mode for new subscriptions * check token inside actions * migration - apple -> apns * wip * register logic, modes * update mode, cron config, verify token changes * refactor * fix test * NTFMODE * server: delete subscriptions on deleteToken * refactor markNtfSubscriptionForDeletion * remove NTFMODE * remove subscriptions when token is deleted * refactor * lint * test * check ntfMode Co-authored-by: Evgeny Poberezkin <2769109+epoberezkin@users.noreply.github.com>
This commit is contained in:
@@ -55,7 +55,6 @@ module Simplex.Messaging.Agent
|
||||
setNtfServers,
|
||||
registerNtfToken,
|
||||
verifyNtfToken,
|
||||
enableNtfCron,
|
||||
checkNtfToken,
|
||||
deleteNtfToken,
|
||||
getNtfToken,
|
||||
@@ -83,7 +82,6 @@ import Data.Maybe (isJust)
|
||||
import qualified Data.Text as T
|
||||
import Data.Time.Clock
|
||||
import Data.Time.Clock.System (systemToUTCTime)
|
||||
import Data.Word (Word16)
|
||||
import qualified Database.SQLite.Simple as DB
|
||||
-- import GHC.Conc (unsafeIOToSTM)
|
||||
import Simplex.Messaging.Agent.Client
|
||||
@@ -99,7 +97,7 @@ import qualified Simplex.Messaging.Crypto.Ratchet as CR
|
||||
import Simplex.Messaging.Encoding
|
||||
import Simplex.Messaging.Encoding.String (StrEncoding (..))
|
||||
import Simplex.Messaging.Notifications.Client
|
||||
import Simplex.Messaging.Notifications.Protocol (DeviceToken, NtfRegCode (NtfRegCode), NtfTknStatus (..))
|
||||
import Simplex.Messaging.Notifications.Protocol (DeviceToken, NtfRegCode (NtfRegCode), NtfTknStatus (..), NtfTokenId)
|
||||
import Simplex.Messaging.Notifications.Server.Push.APNS (PNMessageData (..))
|
||||
import Simplex.Messaging.Parsers (parse)
|
||||
import Simplex.Messaging.Protocol (BrokerMsg, ErrorType (AUTH), MsgBody, MsgFlags, SMPMsgMeta)
|
||||
@@ -199,10 +197,6 @@ registerNtfToken c = withAgentEnv c .: registerNtfToken' c
|
||||
verifyNtfToken :: AgentErrorMonad m => AgentClient -> DeviceToken -> ByteString -> C.CbNonce -> m ()
|
||||
verifyNtfToken c = withAgentEnv c .:. verifyNtfToken' c
|
||||
|
||||
-- | Enable/disable periodic notifications
|
||||
enableNtfCron :: AgentErrorMonad m => AgentClient -> DeviceToken -> Word16 -> m ()
|
||||
enableNtfCron c = withAgentEnv c .: enableNtfCron' c
|
||||
|
||||
checkNtfToken :: AgentErrorMonad m => AgentClient -> DeviceToken -> m NtfTknStatus
|
||||
checkNtfToken c = withAgentEnv c . checkNtfToken' c
|
||||
|
||||
@@ -641,7 +635,7 @@ deleteConnection' c connId =
|
||||
atomically $ removeSubscription c connId
|
||||
withStore' c (`deleteConn` connId)
|
||||
ns <- asks ntfSupervisor
|
||||
atomically $ sendNtfSubCommand ns (connId, NSCDelete)
|
||||
atomically $ writeTBQueue (ntfSubQ ns) (connId, NSCDelete)
|
||||
|
||||
-- | Change servers to be used for creating new queues, in Reader monad
|
||||
setSMPServers' :: AgentMonad m => AgentClient -> NonEmpty SMPServer -> m ()
|
||||
@@ -649,31 +643,50 @@ setSMPServers' c servers = do
|
||||
atomically $ writeTVar (smpServers c) servers
|
||||
|
||||
registerNtfToken' :: forall m. AgentMonad m => AgentClient -> DeviceToken -> NotificationsMode -> m NtfTknStatus
|
||||
registerNtfToken' c deviceToken ntfMode =
|
||||
withStore' c (`getDeviceNtfToken` deviceToken) >>= \case
|
||||
(Just tkn@NtfToken {ntfTokenId, ntfTknStatus, ntfTknAction, ntfMode = _currentNtfMode}, prevTokens) -> do
|
||||
mapM_ (deleteToken_ c) prevTokens
|
||||
ns <- asks ntfSupervisor
|
||||
case (ntfTokenId, ntfTknAction) of
|
||||
(Nothing, Just NTARegister) -> registerToken tkn $> NTRegistered
|
||||
registerNtfToken' c suppliedDeviceToken suppliedNtfMode =
|
||||
withStore' c getSavedNtfToken >>= \case
|
||||
Just tkn@NtfToken {deviceToken = savedDeviceToken, ntfTokenId, ntfTknStatus, ntfTknAction, ntfMode = savedNtfMode} -> do
|
||||
status <- case (ntfTokenId, ntfTknAction) of
|
||||
(Nothing, Just NTARegister) -> do
|
||||
when (savedDeviceToken /= suppliedDeviceToken) $ withStore' c $ \db -> updateDeviceToken db tkn suppliedDeviceToken
|
||||
registerToken tkn $> NTRegistered
|
||||
-- TODO minimal time before repeat registration
|
||||
(Just _, Nothing) -> when (ntfTknStatus == NTRegistered) (registerToken tkn) $> NTRegistered
|
||||
(Just tknId, Just (NTAVerify code)) ->
|
||||
t tkn (NTActive, Just NTACheck) $ agentNtfVerifyToken c tknId tkn code
|
||||
(Just tknId, Just (NTACron interval)) ->
|
||||
t tkn (cronSuccess interval) $ agentNtfEnableCron c tknId tkn interval
|
||||
(Just _tknId, Just NTACheck) -> do
|
||||
if ntfTknStatus == NTActive && ntfMode == NMInstant
|
||||
then initializeNtfSubQ c tkn
|
||||
else atomically $ nsUpdateToken ns tkn
|
||||
pure ntfTknStatus -- TODO
|
||||
-- agentNtfCheckToken c tknId tkn >>= \case
|
||||
(Just tknId, Nothing)
|
||||
| savedDeviceToken == suppliedDeviceToken ->
|
||||
when (ntfTknStatus == NTRegistered) (registerToken tkn) $> NTRegistered
|
||||
| otherwise -> replaceToken tknId $> NTRegistered
|
||||
(Just tknId, Just (NTAVerify code))
|
||||
| savedDeviceToken == suppliedDeviceToken ->
|
||||
t tkn (NTActive, Just NTACheck) $ agentNtfVerifyToken c tknId tkn code
|
||||
| otherwise -> replaceToken tknId $> NTRegistered
|
||||
(Just tknId, Just NTACheck)
|
||||
| savedDeviceToken == suppliedDeviceToken -> do
|
||||
ns <- asks ntfSupervisor
|
||||
atomically $ nsUpdateToken ns tkn {ntfMode = suppliedNtfMode}
|
||||
when (ntfTknStatus == NTActive) $ do
|
||||
cron <- asks $ ntfCron . config
|
||||
agentNtfEnableCron c tknId tkn cron
|
||||
when (suppliedNtfMode == NMInstant) $ initializeNtfSubs c
|
||||
when (suppliedNtfMode == NMPeriodic && savedNtfMode == NMInstant) $ smpDeleteNtfSubs c
|
||||
pure ntfTknStatus -- TODO
|
||||
-- agentNtfCheckToken c tknId tkn >>= \case
|
||||
| otherwise -> replaceToken tknId $> NTRegistered
|
||||
(Just tknId, Just NTADelete) -> do
|
||||
agentNtfDeleteToken c tknId tkn
|
||||
withStore' c (`removeNtfToken` tkn)
|
||||
ns <- asks ntfSupervisor
|
||||
atomically $ nsRemoveNtfToken ns
|
||||
pure NTExpired
|
||||
_ -> pure ntfTknStatus
|
||||
withStore' c $ \db -> updateNtfMode db tkn suppliedNtfMode
|
||||
pure status
|
||||
where
|
||||
replaceToken :: NtfTokenId -> m ()
|
||||
replaceToken tknId = do
|
||||
agentNtfReplaceToken c tknId tkn suppliedDeviceToken
|
||||
withStore' c $ \db -> updateDeviceToken db tkn suppliedDeviceToken
|
||||
ns <- asks ntfSupervisor
|
||||
atomically $ nsUpdateToken ns tkn {deviceToken = suppliedDeviceToken, ntfTknStatus = NTRegistered, ntfMode = suppliedNtfMode}
|
||||
_ ->
|
||||
getNtfServer c >>= \case
|
||||
Just ntfServer ->
|
||||
@@ -681,7 +694,7 @@ registerNtfToken' c deviceToken ntfMode =
|
||||
C.SignAlg a -> do
|
||||
tknKeys <- liftIO $ C.generateSignatureKeyPair a
|
||||
dhKeys <- liftIO C.generateKeyPair'
|
||||
let tkn = newNtfToken deviceToken ntfServer tknKeys dhKeys ntfMode
|
||||
let tkn = newNtfToken suppliedDeviceToken ntfServer tknKeys dhKeys suppliedNtfMode
|
||||
withStore' c (`createNtfToken` tkn)
|
||||
registerToken tkn
|
||||
pure NTRegistered
|
||||
@@ -694,52 +707,52 @@ registerNtfToken' c deviceToken ntfMode =
|
||||
let dhSecret = C.dh' srvPubDhKey privDhKey
|
||||
withStore' c $ \db -> updateNtfTokenRegistration db tkn tknId dhSecret
|
||||
ns <- asks ntfSupervisor
|
||||
atomically $ nsUpdateToken ns tkn
|
||||
atomically $ nsUpdateToken ns tkn {deviceToken = suppliedDeviceToken, ntfTknStatus = NTRegistered, ntfMode = suppliedNtfMode}
|
||||
|
||||
-- TODO decrypt verification code
|
||||
verifyNtfToken' :: AgentMonad m => AgentClient -> DeviceToken -> ByteString -> C.CbNonce -> m ()
|
||||
verifyNtfToken' c deviceToken code nonce =
|
||||
withStore' c (`getDeviceNtfToken` deviceToken) >>= \case
|
||||
(Just tkn@NtfToken {ntfTokenId = Just tknId, ntfDhSecret = Just dhSecret}, _) -> do
|
||||
withStore' c getSavedNtfToken >>= \case
|
||||
Just tkn@NtfToken {deviceToken = savedDeviceToken, ntfTokenId = Just tknId, ntfDhSecret = Just dhSecret, ntfMode} -> do
|
||||
when (deviceToken /= savedDeviceToken) . throwError $ CMD PROHIBITED
|
||||
code' <- liftEither . bimap cryptoError NtfRegCode $ C.cbDecrypt dhSecret nonce code
|
||||
void . withToken c tkn (Just (NTConfirmed, NTAVerify code')) (NTActive, Just NTACheck) $ do
|
||||
agentNtfVerifyToken c tknId tkn code'
|
||||
toStatus <-
|
||||
withToken c tkn (Just (NTConfirmed, NTAVerify code')) (NTActive, Just NTACheck) $
|
||||
agentNtfVerifyToken c tknId tkn code'
|
||||
when (toStatus == NTActive) $ do
|
||||
cron <- asks $ ntfCron . config
|
||||
agentNtfEnableCron c tknId tkn cron
|
||||
when (ntfMode == NMInstant) $ initializeNtfSubs c
|
||||
_ -> throwError $ CMD PROHIBITED
|
||||
|
||||
enableNtfCron' :: AgentMonad m => AgentClient -> DeviceToken -> Word16 -> m ()
|
||||
enableNtfCron' c deviceToken interval = do
|
||||
when (interval < 20) . throwError $ CMD PROHIBITED
|
||||
withStore' c (`getDeviceNtfToken` deviceToken) >>= \case
|
||||
(Just tkn@NtfToken {ntfTokenId = Just tknId, ntfTknStatus = NTActive}, _) ->
|
||||
void . withToken c tkn (Just (NTActive, NTACron interval)) (cronSuccess interval) $
|
||||
agentNtfEnableCron c tknId tkn interval
|
||||
_ -> throwError $ CMD PROHIBITED
|
||||
|
||||
cronSuccess :: Word16 -> (NtfTknStatus, Maybe NtfTknAction)
|
||||
cronSuccess interval
|
||||
| interval == 0 = (NTActive, Just NTACheck)
|
||||
| otherwise = (NTActive, Just $ NTACron interval)
|
||||
|
||||
checkNtfToken' :: AgentMonad m => AgentClient -> DeviceToken -> m NtfTknStatus
|
||||
checkNtfToken' c deviceToken =
|
||||
withStore' c (`getDeviceNtfToken` deviceToken) >>= \case
|
||||
(Just tkn@NtfToken {ntfTokenId = Just tknId}, _) -> agentNtfCheckToken c tknId tkn
|
||||
withStore' c getSavedNtfToken >>= \case
|
||||
Just tkn@NtfToken {deviceToken = savedDeviceToken, ntfTokenId = Just tknId} -> do
|
||||
when (deviceToken /= savedDeviceToken) . throwError $ CMD PROHIBITED
|
||||
agentNtfCheckToken c tknId tkn
|
||||
_ -> throwError $ CMD PROHIBITED
|
||||
|
||||
deleteNtfToken' :: AgentMonad m => AgentClient -> DeviceToken -> m ()
|
||||
deleteNtfToken' c deviceToken =
|
||||
withStore' c (`getDeviceNtfToken` deviceToken) >>= \case
|
||||
(Just tkn, _) -> deleteToken_ c tkn
|
||||
withStore' c getSavedNtfToken >>= \case
|
||||
Just tkn@NtfToken {deviceToken = savedDeviceToken} -> do
|
||||
when (deviceToken /= savedDeviceToken) . throwError $ CMD PROHIBITED
|
||||
deleteToken_ c tkn
|
||||
smpDeleteNtfSubs c
|
||||
_ -> throwError $ CMD PROHIBITED
|
||||
|
||||
getNtfToken' :: AgentMonad m => AgentClient -> m (DeviceToken, NtfTknStatus, NotificationsMode)
|
||||
getNtfToken' _c = throwError $ CMD PROHIBITED
|
||||
getNtfToken' c =
|
||||
withStore' c getSavedNtfToken >>= \case
|
||||
Just NtfToken {deviceToken, ntfTknStatus, ntfMode} -> pure (deviceToken, ntfTknStatus, ntfMode)
|
||||
_ -> throwError $ CMD PROHIBITED
|
||||
|
||||
-- | Delete notification subscription for connection, in Reader monad
|
||||
deleteNtfSub' :: AgentMonad m => AgentClient -> ConnId -> m ()
|
||||
deleteNtfSub' _c connId = do
|
||||
ns <- asks ntfSupervisor
|
||||
atomically $ sendNtfSubCommand ns (connId, NSCDelete)
|
||||
atomically $ writeTBQueue (ntfSubQ ns) (connId, NSCDelete)
|
||||
|
||||
deleteToken_ :: AgentMonad m => AgentClient -> NtfToken -> m ()
|
||||
deleteToken_ c tkn@NtfToken {ntfTokenId, ntfTknStatus} = do
|
||||
@@ -764,9 +777,7 @@ withToken c tkn@NtfToken {deviceToken, ntfMode} from_ (toStatus, toAction_) f =
|
||||
Right _ -> do
|
||||
withStore' c $ \db -> updateNtfToken db tkn toStatus toAction_
|
||||
let updatedToken = tkn {ntfTknStatus = toStatus, ntfTknAction = toAction_}
|
||||
if toStatus == NTActive && ntfMode == NMInstant
|
||||
then initializeNtfSubQ c updatedToken
|
||||
else atomically $ nsUpdateToken ns updatedToken
|
||||
atomically $ nsUpdateToken ns updatedToken
|
||||
pure toStatus
|
||||
Left e@(NTF AUTH) -> do
|
||||
withStore' c $ \db -> removeNtfToken db tkn
|
||||
@@ -775,14 +786,18 @@ withToken c tkn@NtfToken {deviceToken, ntfMode} from_ (toStatus, toAction_) f =
|
||||
throwError e
|
||||
Left e -> throwError e
|
||||
|
||||
initializeNtfSubQ :: AgentMonad m => AgentClient -> NtfToken -> m ()
|
||||
initializeNtfSubQ c tkn = do
|
||||
initializeNtfSubs :: AgentMonad m => AgentClient -> m ()
|
||||
initializeNtfSubs c = do
|
||||
ns <- asks ntfSupervisor
|
||||
connIds <- atomically $ do
|
||||
nsUpdateToken ns tkn
|
||||
getSubscriptions c
|
||||
connIds <- atomically $ getSubscriptions c
|
||||
forM_ connIds $ \connId -> atomically $ sendNtfSubCommand ns (connId, NSCCreate)
|
||||
|
||||
smpDeleteNtfSubs :: AgentMonad m => AgentClient -> m ()
|
||||
smpDeleteNtfSubs c = do
|
||||
ns <- asks ntfSupervisor
|
||||
connIds <- atomically $ getSubscriptions c
|
||||
forM_ connIds $ \connId -> atomically $ writeTBQueue (ntfSubQ ns) (connId, NSCSmpDelete)
|
||||
|
||||
-- TODO
|
||||
-- There should probably be another function to cancel all subscriptions that would flush the queue first,
|
||||
-- so that supervisor stops processing pending commands?
|
||||
|
||||
@@ -46,6 +46,7 @@ import Simplex.Messaging.Version
|
||||
import System.Random (StdGen, newStdGen)
|
||||
import UnliftIO (Async)
|
||||
import UnliftIO.STM
|
||||
import Data.Word (Word16)
|
||||
|
||||
-- | Agent monad with MonadReader Env and MonadError AgentErrorType
|
||||
type AgentMonad m = (MonadUnliftIO m, MonadReader Env m, MonadError AgentErrorType m)
|
||||
@@ -67,6 +68,7 @@ data AgentConfig = AgentConfig
|
||||
reconnectInterval :: RetryInterval,
|
||||
helloTimeout :: NominalDiffTime,
|
||||
resubscriptionConcurrency :: Int,
|
||||
ntfCron :: Word16,
|
||||
ntfWorkerThrottle :: Int,
|
||||
ntfSubCheckInterval :: NominalDiffTime,
|
||||
ntfMaxMessages :: Int,
|
||||
@@ -101,6 +103,7 @@ defaultAgentConfig =
|
||||
reconnectInterval = defaultReconnectInterval,
|
||||
helloTimeout = 2 * nominalDay,
|
||||
resubscriptionConcurrency = 16,
|
||||
ntfCron = 20, -- minutes
|
||||
ntfWorkerThrottle = 1000000, -- microseconds
|
||||
ntfSubCheckInterval = nominalDay,
|
||||
ntfMaxMessages = 4,
|
||||
@@ -138,7 +141,7 @@ data NtfSupervisor = NtfSupervisor
|
||||
ntfSMPWorkers :: TMap SMPServer (TMVar (), Async ())
|
||||
}
|
||||
|
||||
data NtfSupervisorCommand = NSCCreate | NSCDelete | NSCNtfWorker NtfServer | NSCNtfSMPWorker SMPServer
|
||||
data NtfSupervisorCommand = NSCCreate | NSCDelete | NSCSmpDelete | NSCNtfWorker NtfServer | NSCNtfSMPWorker SMPServer
|
||||
|
||||
newNtfSubSupervisor :: Natural -> STM NtfSupervisor
|
||||
newNtfSubSupervisor qSize = do
|
||||
|
||||
@@ -27,7 +27,7 @@ import qualified Data.Map.Strict as M
|
||||
import Data.Time (UTCTime, addUTCTime, diffUTCTime, getCurrentTime, nominalDiffTimeToSeconds)
|
||||
import Simplex.Messaging.Agent.Client
|
||||
import Simplex.Messaging.Agent.Env.SQLite
|
||||
import Simplex.Messaging.Agent.Protocol (AgentErrorType (..), BrokerErrorType (..), ConnId)
|
||||
import Simplex.Messaging.Agent.Protocol (AgentErrorType (..), BrokerErrorType (..), ConnId, NotificationsMode (..))
|
||||
import qualified Simplex.Messaging.Agent.Protocol as AP
|
||||
import Simplex.Messaging.Agent.RetryInterval
|
||||
import Simplex.Messaging.Agent.Store
|
||||
@@ -90,10 +90,16 @@ processNtfSub c (connId, cmd) = do
|
||||
addNtfWorker ntfServer
|
||||
_ -> pure () -- error - notification server not configured
|
||||
NSCDelete -> do
|
||||
withStore' c (`markNtfSubscriptionForDeletion` connId)
|
||||
withStore' c $ \db -> markNtfSubscriptionForDeletion db connId (NtfSubAction NSADelete)
|
||||
case ntfServer_ of
|
||||
(Just ntfServer) -> addNtfWorker ntfServer
|
||||
_ -> pure ()
|
||||
NSCSmpDelete -> do
|
||||
withStore' c (`getRcvQueue` connId) >>= \case
|
||||
Right RcvQueue {server = smpServer} -> do
|
||||
withStore' c $ \db -> markNtfSubscriptionForDeletion db connId (NtfSubSMPAction NSASmpDelete)
|
||||
addNtfSMPWorker smpServer
|
||||
Left _ -> pure ()
|
||||
NSCNtfWorker ntfServer ->
|
||||
addNtfWorker ntfServer
|
||||
NSCNtfSMPWorker smpServer ->
|
||||
@@ -119,57 +125,61 @@ processNtfSub c (connId, cmd) = do
|
||||
runNtfWorker :: forall m. AgentMonad m => AgentClient -> NtfServer -> TMVar () -> m ()
|
||||
runNtfWorker c srv doWork = forever $ do
|
||||
void . atomically $ readTMVar doWork
|
||||
getNtfToken >>= \case
|
||||
Just tkn -> do
|
||||
nextSub_ <- withStore' c (`getNextNtfSubAction` srv)
|
||||
case nextSub_ of
|
||||
Nothing -> noWorkToDo
|
||||
Just ntfSub@(NtfSubscription {connId}, _) -> do
|
||||
ri <- asks $ reconnectInterval . config
|
||||
withRetryInterval ri $ \loop ->
|
||||
processAction tkn ntfSub
|
||||
`catchError` ( \e ->
|
||||
case e of
|
||||
BROKER NETWORK -> loop
|
||||
BROKER TIMEOUT -> loop
|
||||
_ -> ntfInternalError c connId (show e)
|
||||
)
|
||||
_ -> noWorkToDo
|
||||
nextSub_ <- withStore' c (`getNextNtfSubAction` srv)
|
||||
case nextSub_ of
|
||||
Nothing -> noWorkToDo
|
||||
Just ntfSub@(NtfSubscription {connId}, _) -> do
|
||||
ri <- asks $ reconnectInterval . config
|
||||
withRetryInterval ri $ \loop ->
|
||||
processAction ntfSub
|
||||
`catchError` ( \e ->
|
||||
case e of
|
||||
BROKER NETWORK -> loop
|
||||
BROKER TIMEOUT -> loop
|
||||
_ -> ntfInternalError c connId (show e)
|
||||
)
|
||||
throttle <- asks $ ntfWorkerThrottle . config
|
||||
liftIO $ threadDelay throttle
|
||||
where
|
||||
noWorkToDo = void . atomically $ tryTakeTMVar doWork
|
||||
processAction :: NtfToken -> (NtfSubscription, NtfSubAction) -> m ()
|
||||
processAction tkn@NtfToken {ntfTokenId, ntfTknStatus} (ntfSub@NtfSubscription {connId, smpServer, ntfSubId}, ntfSubAction) = do
|
||||
processAction :: (NtfSubscription, NtfSubAction) -> m ()
|
||||
processAction (ntfSub@NtfSubscription {connId, smpServer, ntfSubId}, ntfSubAction) = do
|
||||
ts <- liftIO getCurrentTime
|
||||
unlessM (rescheduleAction doWork ts ntfSub) $
|
||||
case ntfSubAction of
|
||||
NSACreate -> do
|
||||
rq_ <- withStore' c (`getRcvQueue` connId)
|
||||
case (rq_, ntfTknStatus, ntfTokenId) of
|
||||
(Right RcvQueue {clientNtfCreds = Just ClientNtfCreds {ntfPrivateKey, notifierId}}, NTActive, Just tknId) -> do
|
||||
nSubId <- agentNtfCreateSubscription c tknId tkn (SMPQueueNtf smpServer notifierId) ntfPrivateKey
|
||||
let actionTs = addUTCTime 30 ts
|
||||
withStore' c $ \db ->
|
||||
updateNtfSubscription db connId ntfSub {ntfSubId = Just nSubId, ntfSubStatus = NASCreated NSNew, ntfSubActionTs = actionTs} (NtfSubAction NSACheck)
|
||||
_ -> ntfInternalError c connId "NSACreate - no notifier queue credentials or token not active"
|
||||
NSACheck -> case ntfSubId of
|
||||
Just nSubId ->
|
||||
agentNtfCheckSubscription c nSubId tkn >>= \case
|
||||
NSNew -> updateSubNextCheck ts NSNew
|
||||
NSPending -> updateSubNextCheck ts NSPending
|
||||
NSActive -> updateSubNextCheck ts NSActive
|
||||
NSEnd -> updateSubNextCheck ts NSEnd
|
||||
NSSMPAuth -> updateSub (NASCreated NSSMPAuth) (NtfSubAction NSADelete) ts
|
||||
Nothing -> ntfInternalError c connId "NSACheck - no subscription ID"
|
||||
NSACreate ->
|
||||
getNtfToken >>= \case
|
||||
Just tkn@NtfToken {ntfTokenId = Just tknId, ntfTknStatus = NTActive, ntfMode = NMInstant} -> do
|
||||
RcvQueue {clientNtfCreds} <- withStore c (`getRcvQueue` connId)
|
||||
case clientNtfCreds of
|
||||
Just ClientNtfCreds {ntfPrivateKey, notifierId} -> do
|
||||
nSubId <- agentNtfCreateSubscription c tknId tkn (SMPQueueNtf smpServer notifierId) ntfPrivateKey
|
||||
let actionTs = addUTCTime 30 ts
|
||||
withStore' c $ \db ->
|
||||
updateNtfSubscription db connId ntfSub {ntfSubId = Just nSubId, ntfSubStatus = NASCreated NSNew, ntfSubActionTs = actionTs} (NtfSubAction NSACheck)
|
||||
_ -> ntfInternalError c connId "NSACreate - no notifier queue credentials"
|
||||
_ -> ntfInternalError c connId "NSACreate - no active token"
|
||||
NSACheck ->
|
||||
getNtfToken >>= \case
|
||||
Just tkn ->
|
||||
case ntfSubId of
|
||||
Just nSubId ->
|
||||
agentNtfCheckSubscription c nSubId tkn >>= \case
|
||||
NSNew -> updateSubNextCheck ts NSNew
|
||||
NSPending -> updateSubNextCheck ts NSPending
|
||||
NSActive -> updateSubNextCheck ts NSActive
|
||||
NSEnd -> updateSubNextCheck ts NSEnd
|
||||
NSSMPAuth -> updateSub (NASCreated NSSMPAuth) (NtfSubAction NSADelete) ts -- TODO re-create subscription?
|
||||
Nothing -> ntfInternalError c connId "NSACheck - no subscription ID"
|
||||
_ -> ntfInternalError c connId "NSACheck - no active token"
|
||||
NSADelete -> case ntfSubId of
|
||||
Just nSubId ->
|
||||
agentNtfDeleteSubscription c nSubId tkn
|
||||
(getNtfToken >>= \tkn -> forM_ tkn $ agentNtfDeleteSubscription c nSubId)
|
||||
`E.finally` do
|
||||
withStore' c $ \db ->
|
||||
updateNtfSubscription db connId ntfSub {ntfSubId = Nothing, ntfSubStatus = NASOff, ntfSubActionTs = ts} (NtfSubSMPAction NSASmpDelete)
|
||||
ns <- asks ntfSupervisor
|
||||
atomically $ sendNtfSubCommand ns (connId, NSCNtfSMPWorker smpServer)
|
||||
atomically $ writeTBQueue (ntfSubQ ns) (connId, NSCNtfSMPWorker smpServer)
|
||||
Nothing -> ntfInternalError c connId "NSADelete - no subscription ID"
|
||||
where
|
||||
updateSubNextCheck ts toStatus = do
|
||||
@@ -183,35 +193,32 @@ runNtfWorker c srv doWork = forever $ do
|
||||
runNtfSMPWorker :: forall m. AgentMonad m => AgentClient -> SMPServer -> TMVar () -> m ()
|
||||
runNtfSMPWorker c srv doWork = forever $ do
|
||||
void . atomically $ readTMVar doWork
|
||||
getNtfToken >>= \case
|
||||
Just tkn -> do
|
||||
nextSub_ <- withStore' c (`getNextNtfSubSMPAction` srv)
|
||||
case nextSub_ of
|
||||
Nothing -> noWorkToDo
|
||||
Just ntfSub@(NtfSubscription {connId}, _) -> do
|
||||
ri <- asks $ reconnectInterval . config
|
||||
withRetryInterval ri $ \loop ->
|
||||
processAction tkn ntfSub
|
||||
`catchError` ( \e ->
|
||||
case e of
|
||||
BROKER NETWORK -> loop
|
||||
BROKER TIMEOUT -> loop
|
||||
_ -> ntfInternalError c connId (show e)
|
||||
)
|
||||
_ -> noWorkToDo
|
||||
nextSub_ <- withStore' c (`getNextNtfSubSMPAction` srv)
|
||||
case nextSub_ of
|
||||
Nothing -> noWorkToDo
|
||||
Just ntfSub@(NtfSubscription {connId}, _) -> do
|
||||
ri <- asks $ reconnectInterval . config
|
||||
withRetryInterval ri $ \loop ->
|
||||
processAction ntfSub
|
||||
`catchError` ( \e ->
|
||||
case e of
|
||||
BROKER NETWORK -> loop
|
||||
BROKER TIMEOUT -> loop
|
||||
_ -> ntfInternalError c connId (show e)
|
||||
)
|
||||
throttle <- asks $ ntfWorkerThrottle . config
|
||||
liftIO $ threadDelay throttle
|
||||
where
|
||||
noWorkToDo = void . atomically $ tryTakeTMVar doWork
|
||||
processAction :: NtfToken -> (NtfSubscription, NtfSubSMPAction) -> m ()
|
||||
processAction NtfToken {ntfTknStatus} (ntfSub@NtfSubscription {connId, ntfServer}, ntfSubAction) = do
|
||||
processAction :: (NtfSubscription, NtfSubSMPAction) -> m ()
|
||||
processAction (ntfSub@NtfSubscription {connId, ntfServer}, ntfSubAction) = do
|
||||
ts <- liftIO getCurrentTime
|
||||
unlessM (rescheduleAction doWork ts ntfSub) $
|
||||
case ntfSubAction of
|
||||
NSASmpKey -> do
|
||||
rq_ <- withStore' c (`getRcvQueue` connId)
|
||||
case (rq_, ntfTknStatus) of
|
||||
(Right rq, NTActive) -> do
|
||||
NSASmpKey ->
|
||||
getNtfToken >>= \case
|
||||
Just NtfToken {ntfTknStatus = NTActive, ntfMode = NMInstant} -> do
|
||||
rq <- withStore c (`getRcvQueue` connId)
|
||||
C.SignAlg a <- asks (cmdSignAlg . config)
|
||||
(ntfPublicKey, ntfPrivateKey) <- liftIO $ C.generateSignatureKeyPair a
|
||||
(rcvNtfPubDhKey, rcvNtfPrivDhKey) <- liftIO C.generateKeyPair'
|
||||
@@ -222,7 +229,7 @@ runNtfSMPWorker c srv doWork = forever $ do
|
||||
updateNtfSubscription st connId ntfSub {ntfQueueId = Just notifierId, ntfSubStatus = NASKey, ntfSubActionTs = ts} (NtfSubAction NSACreate)
|
||||
ns <- asks ntfSupervisor
|
||||
atomically $ sendNtfSubCommand ns (connId, NSCNtfWorker ntfServer)
|
||||
_ -> ntfInternalError c connId "NSASmpKey - no rcv queue or token not active"
|
||||
_ -> ntfInternalError c connId "NSASmpKey - no active token"
|
||||
NSASmpDelete -> do
|
||||
rq_ <- withStore' c (`getRcvQueue` connId)
|
||||
forM_ rq_ $ \rq -> disableQueueNotifications c rq
|
||||
@@ -263,7 +270,11 @@ nsRemoveNtfToken ns = writeTVar (ntfTkn ns) Nothing
|
||||
sendNtfSubCommand :: NtfSupervisor -> (ConnId, NtfSupervisorCommand) -> STM ()
|
||||
sendNtfSubCommand ns cmd =
|
||||
readTVar (ntfTkn ns)
|
||||
>>= mapM_ (\NtfToken {ntfTknStatus} -> when (ntfTknStatus == NTActive) $ writeTBQueue (ntfSubQ ns) cmd)
|
||||
>>= mapM_
|
||||
( \NtfToken {ntfTknStatus, ntfMode} ->
|
||||
when (ntfTknStatus == NTActive && ntfMode == NMInstant) $
|
||||
writeTBQueue (ntfSubQ ns) cmd
|
||||
)
|
||||
|
||||
closeNtfSupervisor :: NtfSupervisor -> IO ()
|
||||
closeNtfSupervisor ns = do
|
||||
|
||||
@@ -133,7 +133,6 @@ import qualified Simplex.Messaging.Crypto as C
|
||||
import Simplex.Messaging.Crypto.Ratchet (E2ERatchetParams, E2ERatchetParamsUri)
|
||||
import Simplex.Messaging.Encoding
|
||||
import Simplex.Messaging.Encoding.String
|
||||
import Simplex.Messaging.Notifications.Protocol (NtfTknStatus)
|
||||
import Simplex.Messaging.Parsers
|
||||
import Simplex.Messaging.Protocol
|
||||
( ErrorType,
|
||||
@@ -231,23 +230,20 @@ data ACommand (p :: AParty) where
|
||||
OK :: ACommand Agent
|
||||
ERR :: AgentErrorType -> ACommand Agent
|
||||
SUSPENDED :: ACommand Agent
|
||||
NTFMODE :: NtfTknStatus -> NotificationsMode -> ACommand Agent
|
||||
|
||||
deriving instance Eq (ACommand p)
|
||||
|
||||
deriving instance Show (ACommand p)
|
||||
|
||||
data NotificationsMode = NMOff | NMPeriodic | NMInstant
|
||||
data NotificationsMode = NMPeriodic | NMInstant
|
||||
deriving (Eq, Show)
|
||||
|
||||
instance StrEncoding NotificationsMode where
|
||||
strEncode = \case
|
||||
NMOff -> "OFF"
|
||||
NMPeriodic -> "PERIODIC"
|
||||
NMInstant -> "INSTANT"
|
||||
strP =
|
||||
A.takeTill (== ' ') >>= \case
|
||||
"OFF" -> pure NMOff
|
||||
"PERIODIC" -> pure NMPeriodic
|
||||
"INSTANT" -> pure NMInstant
|
||||
_ -> fail "bad NotificationsMode"
|
||||
@@ -948,7 +944,6 @@ serializeCommand = \case
|
||||
ERR e -> "ERR " <> strEncode e
|
||||
OK -> "OK"
|
||||
SUSPENDED -> "SUSPENDED"
|
||||
NTFMODE t m -> "NTFMODE " <> smpEncode t <> " " <> strEncode m
|
||||
where
|
||||
showTs :: UTCTime -> ByteString
|
||||
showTs = B.pack . formatISO8601Millis
|
||||
|
||||
@@ -66,10 +66,11 @@ module Simplex.Messaging.Agent.Store.SQLite
|
||||
updateRatchet,
|
||||
-- Notification device token persistence
|
||||
createNtfToken,
|
||||
getDeviceNtfToken,
|
||||
getSavedNtfToken,
|
||||
updateNtfTokenRegistration,
|
||||
updateDeviceToken,
|
||||
updateNtfMode,
|
||||
updateNtfToken,
|
||||
setNtfTokenNtfMode,
|
||||
removeNtfToken,
|
||||
-- Notification subscription persistence
|
||||
getNtfSubscription,
|
||||
@@ -96,12 +97,12 @@ import Control.Concurrent (threadDelay)
|
||||
import Control.Concurrent.STM (stateTVar)
|
||||
import Control.Monad.Except
|
||||
import Crypto.Random (ChaChaDRG, randomBytesGenerate)
|
||||
import Data.Bifunctor (first, second)
|
||||
import Data.Bifunctor (second)
|
||||
import Data.ByteString (ByteString)
|
||||
import qualified Data.ByteString.Base64.URL as U
|
||||
import Data.Char (toLower)
|
||||
import Data.Functor (($>))
|
||||
import Data.List (find, foldl', partition)
|
||||
import Data.List (find, foldl')
|
||||
import qualified Data.Map.Strict as M
|
||||
import Data.Maybe (fromMaybe, listToMaybe)
|
||||
import Data.Text (Text)
|
||||
@@ -122,7 +123,7 @@ import qualified Simplex.Messaging.Crypto as C
|
||||
import Simplex.Messaging.Crypto.Ratchet (RatchetX448, SkippedMsgDiff (..), SkippedMsgKeys)
|
||||
import Simplex.Messaging.Encoding
|
||||
import Simplex.Messaging.Encoding.String
|
||||
import Simplex.Messaging.Notifications.Client (NtfAgentSubStatus (NASDeleted), NtfServer, NtfSubAction (NSADelete), NtfSubOrSMPAction (..), NtfSubSMPAction, NtfSubscription (..), NtfTknAction, NtfToken (..))
|
||||
import Simplex.Messaging.Notifications.Client (NtfAgentSubStatus (..), NtfServer, NtfSubAction (..), NtfSubOrSMPAction (..), NtfSubSMPAction (..), NtfSubscription (..), NtfTknAction (..), NtfToken (..))
|
||||
import Simplex.Messaging.Notifications.Protocol (DeviceToken (..), NtfSubscriptionId, NtfTknStatus (..), NtfTokenId, SMPQueueNtf (..))
|
||||
import Simplex.Messaging.Parsers (blobFieldParser, fromTextField_)
|
||||
import Simplex.Messaging.Protocol (MsgBody, MsgFlags, ProtocolServer (..), RcvNtfDhSecret)
|
||||
@@ -634,25 +635,23 @@ createNtfToken db NtfToken {deviceToken = DeviceToken provider token, ntfServer
|
||||
|]
|
||||
((provider, token, host, port, ntfTokenId, ntfPubKey, ntfPrivKey, ntfDhPubKey, ntfDhPrivKey, ntfDhSecret) :. (ntfTknStatus, ntfTknAction, ntfMode))
|
||||
|
||||
getDeviceNtfToken :: DB.Connection -> DeviceToken -> IO (Maybe NtfToken, [NtfToken])
|
||||
getDeviceNtfToken db t = do
|
||||
tokens <-
|
||||
map ntfToken
|
||||
<$> DB.query_
|
||||
db
|
||||
[sql|
|
||||
SELECT s.ntf_host, s.ntf_port, s.ntf_key_hash,
|
||||
t.provider, t.device_token, t.tkn_id, t.tkn_pub_key, t.tkn_priv_key, t.tkn_pub_dh_key, t.tkn_priv_dh_key, t.tkn_dh_secret,
|
||||
t.tkn_status, t.tkn_action, t.ntf_mode
|
||||
FROM ntf_tokens t
|
||||
JOIN ntf_servers s USING (ntf_host, ntf_port)
|
||||
|]
|
||||
pure . first listToMaybe $ partition ((t ==) . deviceToken) tokens
|
||||
getSavedNtfToken :: DB.Connection -> IO (Maybe NtfToken)
|
||||
getSavedNtfToken db = do
|
||||
maybeFirstRow ntfToken $
|
||||
DB.query_
|
||||
db
|
||||
[sql|
|
||||
SELECT s.ntf_host, s.ntf_port, s.ntf_key_hash,
|
||||
t.provider, t.device_token, t.tkn_id, t.tkn_pub_key, t.tkn_priv_key, t.tkn_pub_dh_key, t.tkn_priv_dh_key, t.tkn_dh_secret,
|
||||
t.tkn_status, t.tkn_action, t.ntf_mode
|
||||
FROM ntf_tokens t
|
||||
JOIN ntf_servers s USING (ntf_host, ntf_port)
|
||||
|]
|
||||
where
|
||||
ntfToken ((host, port, keyHash) :. (provider, dt, ntfTokenId, ntfPubKey, ntfPrivKey, ntfDhPubKey, ntfDhPrivKey, ntfDhSecret) :. (ntfTknStatus, ntfTknAction, ntfMode_)) =
|
||||
let ntfServer = ProtocolServer {host, port, keyHash}
|
||||
ntfDhKeys = (ntfDhPubKey, ntfDhPrivKey)
|
||||
ntfMode = fromMaybe NMOff ntfMode_
|
||||
ntfMode = fromMaybe NMPeriodic ntfMode_
|
||||
in NtfToken {deviceToken = DeviceToken provider dt, ntfServer, ntfTokenId, ntfPubKey, ntfPrivKey, ntfDhKeys, ntfDhSecret, ntfTknStatus, ntfTknAction, ntfMode}
|
||||
|
||||
updateNtfTokenRegistration :: DB.Connection -> NtfToken -> NtfTokenId -> C.DhSecretX25519 -> IO ()
|
||||
@@ -667,6 +666,30 @@ updateNtfTokenRegistration db NtfToken {deviceToken = DeviceToken provider token
|
||||
|]
|
||||
(tknId, ntfDhSecret, NTRegistered, Nothing :: Maybe NtfTknAction, updatedAt, provider, token, host, port)
|
||||
|
||||
updateDeviceToken :: DB.Connection -> NtfToken -> DeviceToken -> IO ()
|
||||
updateDeviceToken db NtfToken {deviceToken = DeviceToken provider token, ntfServer = ProtocolServer {host, port}} (DeviceToken toProvider toToken) = do
|
||||
updatedAt <- getCurrentTime
|
||||
DB.execute
|
||||
db
|
||||
[sql|
|
||||
UPDATE ntf_tokens
|
||||
SET provider = ?, device_token = ?, tkn_status = ?, tkn_action = ?, updated_at = ?
|
||||
WHERE provider = ? AND device_token = ? AND ntf_host = ? AND ntf_port = ?
|
||||
|]
|
||||
(toProvider, toToken, NTRegistered, Nothing :: Maybe NtfTknAction, updatedAt, provider, token, host, port)
|
||||
|
||||
updateNtfMode :: DB.Connection -> NtfToken -> NotificationsMode -> IO ()
|
||||
updateNtfMode db NtfToken {deviceToken = DeviceToken provider token, ntfServer = ProtocolServer {host, port}} ntfMode = do
|
||||
updatedAt <- getCurrentTime
|
||||
DB.execute
|
||||
db
|
||||
[sql|
|
||||
UPDATE ntf_tokens
|
||||
SET ntf_mode = ?, updated_at = ?
|
||||
WHERE provider = ? AND device_token = ? AND ntf_host = ? AND ntf_port = ?
|
||||
|]
|
||||
(ntfMode, updatedAt, provider, token, host, port)
|
||||
|
||||
updateNtfToken :: DB.Connection -> NtfToken -> NtfTknStatus -> Maybe NtfTknAction -> IO ()
|
||||
updateNtfToken db NtfToken {deviceToken = DeviceToken provider token, ntfServer = ProtocolServer {host, port}} tknStatus tknAction = do
|
||||
updatedAt <- getCurrentTime
|
||||
@@ -679,18 +702,6 @@ updateNtfToken db NtfToken {deviceToken = DeviceToken provider token, ntfServer
|
||||
|]
|
||||
(tknStatus, tknAction, updatedAt, provider, token, host, port)
|
||||
|
||||
setNtfTokenNtfMode :: DB.Connection -> NtfToken -> NotificationsMode -> IO ()
|
||||
setNtfTokenNtfMode db NtfToken {deviceToken = DeviceToken provider token, ntfServer = ProtocolServer {host, port}} ntfMode = do
|
||||
updatedAt <- getCurrentTime
|
||||
DB.execute
|
||||
db
|
||||
[sql|
|
||||
UPDATE ntf_tokens
|
||||
SET ntf_mode = ?, updated_at = ?
|
||||
WHERE provider = ? AND device_token = ? AND ntf_host = ? AND ntf_port = ?
|
||||
|]
|
||||
(ntfMode, updatedAt, provider, token, host, port)
|
||||
|
||||
removeNtfToken :: DB.Connection -> NtfToken -> IO ()
|
||||
removeNtfToken db NtfToken {deviceToken = DeviceToken provider token, ntfServer = ProtocolServer {host, port}} =
|
||||
DB.execute
|
||||
@@ -737,8 +748,8 @@ createNtfSubscription db NtfSubscription {connId, smpServer = (SMPServer host po
|
||||
where
|
||||
(ntfSubAction, ntfSubSMPAction) = ntfSubAndSMPAction ntfAction
|
||||
|
||||
markNtfSubscriptionForDeletion :: DB.Connection -> ConnId -> IO ()
|
||||
markNtfSubscriptionForDeletion db connId = do
|
||||
markNtfSubscriptionForDeletion :: DB.Connection -> ConnId -> NtfSubOrSMPAction -> IO ()
|
||||
markNtfSubscriptionForDeletion db connId ntfAction = do
|
||||
updatedAt <- getCurrentTime
|
||||
DB.execute
|
||||
db
|
||||
@@ -747,7 +758,9 @@ markNtfSubscriptionForDeletion db connId = do
|
||||
SET ntf_sub_action = ?, ntf_sub_smp_action = ?, ntf_sub_action_ts = ?, updated_by_supervisor = ?, updated_at = ?
|
||||
WHERE conn_id = ?
|
||||
|]
|
||||
(Just NSADelete, Nothing :: Maybe NtfSubSMPAction, updatedAt, True, updatedAt, connId)
|
||||
(ntfSubAction, ntfSubSMPAction, updatedAt, True, updatedAt, connId)
|
||||
where
|
||||
(ntfSubAction, ntfSubSMPAction) = ntfSubAndSMPAction ntfAction
|
||||
|
||||
updateNtfSubscription :: DB.Connection -> ConnId -> NtfSubscription -> NtfSubOrSMPAction -> IO ()
|
||||
updateNtfSubscription db connId NtfSubscription {ntfQueueId, ntfSubId, ntfSubStatus, ntfSubActionTs} ntfAction = do
|
||||
@@ -872,7 +885,7 @@ getActiveNtfToken db =
|
||||
ntfToken ((host, port, keyHash) :. (provider, dt, ntfTokenId, ntfPubKey, ntfPrivKey, ntfDhPubKey, ntfDhPrivKey, ntfDhSecret) :. (ntfTknStatus, ntfTknAction, ntfMode_)) =
|
||||
let ntfServer = ProtocolServer {host, port, keyHash}
|
||||
ntfDhKeys = (ntfDhPubKey, ntfDhPrivKey)
|
||||
ntfMode = fromMaybe NMOff ntfMode_
|
||||
ntfMode = fromMaybe NMPeriodic ntfMode_
|
||||
in NtfToken {deviceToken = DeviceToken provider dt, ntfServer, ntfTokenId, ntfPubKey, ntfPrivKey, ntfDhKeys, ntfDhSecret, ntfTknStatus, ntfTknAction, ntfMode}
|
||||
|
||||
getNtfRcvQueue :: DB.Connection -> SMPQueueNtf -> IO (Either StoreError (ConnId, RcvNtfDhSecret))
|
||||
|
||||
@@ -18,7 +18,7 @@ CREATE TABLE ntf_servers (
|
||||
) WITHOUT ROWID;
|
||||
|
||||
CREATE TABLE ntf_tokens (
|
||||
provider TEXT NOT NULL, -- apn
|
||||
provider TEXT NOT NULL, -- apns
|
||||
device_token TEXT NOT NULL, -- ! this field is mislabeled and is actually saved as binary
|
||||
ntf_host TEXT NOT NULL,
|
||||
ntf_port TEXT NOT NULL,
|
||||
|
||||
@@ -9,4 +9,6 @@ m20220625_v2_ntf_mode :: Query
|
||||
m20220625_v2_ntf_mode =
|
||||
[sql|
|
||||
ALTER TABLE ntf_tokens ADD COLUMN ntf_mode TEXT NULL;
|
||||
|
||||
DELETE FROM ntf_tokens;
|
||||
|]
|
||||
|
||||
@@ -151,7 +151,7 @@ CREATE TABLE ntf_servers(
|
||||
PRIMARY KEY(ntf_host, ntf_port)
|
||||
) WITHOUT ROWID;
|
||||
CREATE TABLE ntf_tokens(
|
||||
provider TEXT NOT NULL, -- apn
|
||||
provider TEXT NOT NULL, -- apns
|
||||
device_token TEXT NOT NULL, -- ! this field is mislabeled and is actually saved as binary
|
||||
ntf_host TEXT NOT NULL,
|
||||
ntf_port TEXT NOT NULL,
|
||||
|
||||
@@ -80,7 +80,6 @@ data NtfTknAction
|
||||
= NTARegister
|
||||
| NTAVerify NtfRegCode -- code to verify token
|
||||
| NTACheck
|
||||
| NTACron Word16
|
||||
| NTADelete
|
||||
deriving (Show)
|
||||
|
||||
@@ -89,14 +88,12 @@ instance Encoding NtfTknAction where
|
||||
NTARegister -> "R"
|
||||
NTAVerify code -> smpEncode ('V', code)
|
||||
NTACheck -> "C"
|
||||
NTACron interval -> smpEncode ('I', interval)
|
||||
NTADelete -> "D"
|
||||
smpP =
|
||||
A.anyChar >>= \case
|
||||
'R' -> pure NTARegister
|
||||
'V' -> NTAVerify <$> smpP
|
||||
'C' -> pure NTACheck
|
||||
'I' -> NTACron <$> smpP
|
||||
'D' -> pure NTADelete
|
||||
_ -> fail "bad NtfTknAction"
|
||||
|
||||
|
||||
@@ -319,7 +319,9 @@ client NtfServerClient {rcvQ, sndQ} NtfSubscriber {newSubQ, smpAgent = ca} NtfPu
|
||||
TDEL -> do
|
||||
logDebug "TDEL"
|
||||
st <- asks store
|
||||
atomically $ deleteNtfToken st tknId
|
||||
qs <- atomically $ deleteNtfToken st tknId
|
||||
forM_ qs $ \SMPQueueNtf {smpServer, notifierId} ->
|
||||
atomically $ removeSubscription ca smpServer (SPNotifier, notifierId)
|
||||
cancelInvervalNotifications tknId
|
||||
pure NROk
|
||||
TCRN 0 -> do
|
||||
@@ -371,9 +373,8 @@ client NtfServerClient {rcvQ, sndQ} NtfSubscriber {newSubQ, smpAgent = ca} NtfPu
|
||||
SDEL -> do
|
||||
logDebug "SDEL"
|
||||
st <- asks store
|
||||
atomically $ do
|
||||
deleteNtfSubscription st subId
|
||||
removeSubscription ca smpServer (SPNotifier, notifierId)
|
||||
atomically $ deleteNtfSubscription st subId
|
||||
atomically $ removeSubscription ca smpServer (SPNotifier, notifierId)
|
||||
pure NROk
|
||||
PING -> pure NRPong
|
||||
getId :: m NtfEntityId
|
||||
|
||||
@@ -12,7 +12,9 @@ module Simplex.Messaging.Notifications.Server.Store where
|
||||
import Control.Concurrent.STM
|
||||
import Control.Monad
|
||||
import Data.ByteString.Char8 (ByteString)
|
||||
import Data.Functor (($>))
|
||||
import qualified Data.Map.Strict as M
|
||||
import Data.Maybe (catMaybes)
|
||||
import Data.Set (Set)
|
||||
import qualified Data.Set as S
|
||||
import qualified Simplex.Messaging.Crypto as C
|
||||
@@ -121,8 +123,7 @@ removeTokenRegistration st NtfTknData {ntfTknId = tId, token, tknVerifyKey} =
|
||||
>>= mapM_ (\tId' -> when (tId == tId') $ TM.delete k regs)
|
||||
k = C.toPubKey C.pubKeyBytes tknVerifyKey
|
||||
|
||||
-- TODO delete token subscriptions
|
||||
deleteNtfToken :: NtfStore -> NtfTokenId -> STM ()
|
||||
deleteNtfToken :: NtfStore -> NtfTokenId -> STM [SMPQueueNtf]
|
||||
deleteNtfToken st tknId = do
|
||||
TM.lookupDelete tknId (tokens st)
|
||||
>>= mapM_
|
||||
@@ -134,6 +135,22 @@ deleteNtfToken st tknId = do
|
||||
whenM (TM.null tIds) $ TM.delete token regs
|
||||
)
|
||||
)
|
||||
|
||||
qs <-
|
||||
TM.lookupDelete tknId (tokenSubscriptions st)
|
||||
>>= mapM
|
||||
( readTVar
|
||||
>=> mapM
|
||||
( \subId -> do
|
||||
TM.lookupDelete subId (subscriptions st)
|
||||
>>= mapM
|
||||
( \NtfSubData {smpQueue} ->
|
||||
TM.delete smpQueue (subscriptionLookup st) $> smpQueue
|
||||
)
|
||||
)
|
||||
. S.toList
|
||||
)
|
||||
pure $ maybe [] catMaybes qs
|
||||
where
|
||||
regs = tokenRegistrations st
|
||||
regKey = C.toPubKey C.pubKeyBytes
|
||||
@@ -187,39 +204,3 @@ deleteNtfSubscription st subId = do
|
||||
ts_ <- TM.lookup tokenId (tokenSubscriptions st)
|
||||
forM_ ts_ $ \ts -> modifyTVar' ts $ S.delete subId
|
||||
)
|
||||
|
||||
-- getNtfRec :: NtfStore -> SNtfEntity e -> NtfEntityId -> STM (Maybe (NtfEntityRec e))
|
||||
-- getNtfRec st ent entId = case ent of
|
||||
-- SToken -> NtfTkn <$$> TM.lookup entId (tokens st)
|
||||
-- SSubscription -> pure Nothing
|
||||
|
||||
-- getNtfVerifyKey :: NtfStore -> SNtfEntity e -> NtfEntityId -> STM (Maybe (NtfEntityRec e, C.APublicVerifyKey))
|
||||
-- getNtfVerifyKey st ent entId =
|
||||
-- getNtfRec st ent entId >>= \case
|
||||
-- Just r@(NtfTkn NtfTknData {tknVerifyKey}) -> pure $ Just (r, tknVerifyKey)
|
||||
-- Just r@(NtfSub NtfSubData {tokenId}) ->
|
||||
-- getNtfRec st SToken tokenId >>= \case
|
||||
-- Just (NtfTkn NtfTknData {tknVerifyKey}) -> pure $ Just (r, tknVerifyKey)
|
||||
-- _ -> pure Nothing
|
||||
-- _ -> pure Nothing
|
||||
|
||||
-- mkNtfSubsciption :: SMPQueueNtf -> NtfTokenId -> STM NtfSubsciption
|
||||
-- mkNtfSubsciption smpQueue tokenId = do
|
||||
-- subStatus <- newTVar NSNew
|
||||
-- pure NtfSubsciption {smpQueue, tokenId, subStatus}
|
||||
|
||||
-- getNtfSub :: NtfSubscriptionsStore -> NtfSubsciptionId -> STM (Maybe NtfSubsciption)
|
||||
-- getNtfSub st subId = pure Nothing -- maybe (pure $ Left AUTH) (fmap Right . readTVar) . M.lookup subId . subscriptions =<< readTVar st
|
||||
|
||||
-- getNtfSubViaSMPQueue :: NtfSubscriptionsStore -> SMPQueueNtf -> STM (Maybe NtfSubsciption)
|
||||
-- getNtfSubViaSMPQueue st smpQueue = pure Nothing
|
||||
|
||||
-- -- replace keeping status
|
||||
-- updateNtfSub :: NtfSubscriptionsStore -> NtfSubsciption -> SMPQueueNtf -> NtfTokenId -> C.DhSecretX25519 -> STM (Maybe ())
|
||||
-- updateNtfSub st sub smpQueue tokenId dhSecret = pure Nothing
|
||||
|
||||
-- addNtfSub :: NtfSubscriptionsStore -> NtfSubsciptionId -> NtfSubsciption -> STM (Maybe ())
|
||||
-- addNtfSub st subId sub = pure Nothing
|
||||
|
||||
-- deleteNtfSub :: NtfSubscriptionsStore -> NtfSubsciptionId -> STM ()
|
||||
-- deleteNtfSub st subId = pure ()
|
||||
|
||||
@@ -31,13 +31,18 @@ import Simplex.Messaging.Protocol (ErrorType (AUTH), MsgFlags (MsgFlags), SMPMsg
|
||||
import qualified Simplex.Messaging.Protocol as SMP
|
||||
import Simplex.Messaging.Transport (ATransport)
|
||||
import Simplex.Messaging.Util (tryE)
|
||||
import System.Directory (removeFile)
|
||||
import System.Directory (doesFileExist, removeFile)
|
||||
import Test.Hspec
|
||||
import UnliftIO
|
||||
|
||||
removeFileIfExists :: FilePath -> IO ()
|
||||
removeFileIfExists filePath = do
|
||||
fileExists <- doesFileExist filePath
|
||||
when fileExists $ removeFile filePath
|
||||
|
||||
notificationTests :: ATransport -> Spec
|
||||
notificationTests t =
|
||||
after_ (removeFile testDB) $ do
|
||||
after_ (removeFile testDB >> removeFileIfExists testDB2) $ do
|
||||
describe "Managing notification tokens" $ do
|
||||
it "should register and verify notification token" $
|
||||
withAPNSMockServer $ \apns ->
|
||||
@@ -60,6 +65,10 @@ notificationTests t =
|
||||
withSmpServer t $
|
||||
withAPNSMockServer $ \apns ->
|
||||
withNtfServer t $ testNotificationSubscriptionNewConnection apns
|
||||
it "should change notifications mode" $ \_ ->
|
||||
withSmpServer t $
|
||||
withAPNSMockServer $ \apns ->
|
||||
withNtfServer t $ testChangeNotificationsMode apns
|
||||
|
||||
testNotificationToken :: APNSMockServer -> IO ()
|
||||
testNotificationToken APNSMockServer {apnsQ} = do
|
||||
@@ -73,7 +82,6 @@ testNotificationToken APNSMockServer {apnsQ} = do
|
||||
nonce <- C.cbNonce <$> ntfData .-> "nonce"
|
||||
liftIO $ sendApnsResponse APNSRespOk
|
||||
verifyNtfToken a tkn verification nonce
|
||||
enableNtfCron a tkn 30
|
||||
NTActive <- checkNtfToken a tkn
|
||||
deleteNtfToken a tkn
|
||||
-- agent deleted this token
|
||||
@@ -110,7 +118,6 @@ testNtfTokenRepeatRegistration APNSMockServer {apnsQ} = do
|
||||
liftIO $ sendApnsResponse' APNSRespOk
|
||||
-- can still use the first verification code, it is the same after decryption
|
||||
verifyNtfToken a tkn verification nonce
|
||||
enableNtfCron a tkn 30
|
||||
NTActive <- checkNtfToken a tkn
|
||||
pure ()
|
||||
pure ()
|
||||
@@ -149,7 +156,6 @@ testNtfTokenSecondRegistration APNSMockServer {apnsQ} = do
|
||||
Left (NTF AUTH) <- tryE $ checkNtfToken a tkn
|
||||
-- and the second is active
|
||||
NTActive <- checkNtfToken a' tkn
|
||||
enableNtfCron a' tkn 30
|
||||
pure ()
|
||||
pure ()
|
||||
|
||||
@@ -180,7 +186,7 @@ testNtfTokenServerRestart t APNSMockServer {apnsQ} = do
|
||||
liftIO $ sendApnsResponse' APNSRespOk
|
||||
verifyNtfToken a' tkn verification' nonce'
|
||||
NTActive <- checkNtfToken a' tkn
|
||||
enableNtfCron a' tkn 30
|
||||
pure ()
|
||||
pure ()
|
||||
|
||||
testNotificationSubscriptionExistingConnection :: APNSMockServer -> IO ()
|
||||
@@ -248,7 +254,7 @@ testNotificationSubscriptionNewConnection APNSMockServer {apnsQ} = do
|
||||
Right () <- runExceptT $ do
|
||||
-- alice registers notification token
|
||||
let aliceTkn = DeviceToken PPApns "abcd"
|
||||
NTRegistered <- registerNtfToken alice aliceTkn NMPeriodic
|
||||
NTRegistered <- registerNtfToken alice aliceTkn NMInstant
|
||||
APNSMockRequest {notification = APNSNotification {aps = APNSBackground _, notificationData = Just ntfData}, sendApnsResponse} <-
|
||||
atomically $ readTBQueue apnsQ
|
||||
verification <- ntfData .-> "verification"
|
||||
@@ -258,7 +264,7 @@ testNotificationSubscriptionNewConnection APNSMockServer {apnsQ} = do
|
||||
NTActive <- checkNtfToken alice aliceTkn
|
||||
-- bob registers notification token
|
||||
let bobTkn = DeviceToken PPApns "bcde"
|
||||
NTRegistered <- registerNtfToken bob bobTkn NMPeriodic
|
||||
NTRegistered <- registerNtfToken bob bobTkn NMInstant
|
||||
APNSMockRequest {notification = APNSNotification {aps = APNSBackground _, notificationData = Just ntfData'}, sendApnsResponse = sendApnsResponse'} <-
|
||||
atomically $ readTBQueue apnsQ
|
||||
verification' <- ntfData' .-> "verification"
|
||||
@@ -302,6 +308,60 @@ testNotificationSubscriptionNewConnection APNSMockServer {apnsQ} = do
|
||||
baseId = 3
|
||||
msgId = subtract baseId
|
||||
|
||||
testChangeNotificationsMode :: APNSMockServer -> IO ()
|
||||
testChangeNotificationsMode APNSMockServer {apnsQ} = do
|
||||
alice <- getSMPAgentClient agentCfg initAgentServers
|
||||
bob <- getSMPAgentClient agentCfg {dbFile = testDB2} initAgentServers
|
||||
Right () <- runExceptT $ do
|
||||
-- establish connection
|
||||
(bobId, qInfo) <- createConnection alice SCMInvitation
|
||||
aliceId <- joinConnection bob qInfo "bob's connInfo"
|
||||
("", _, CONF confId "bob's connInfo") <- get alice
|
||||
allowConnection alice bobId confId "alice's connInfo"
|
||||
get bob ##> ("", aliceId, INFO "alice's connInfo")
|
||||
get alice ##> ("", bobId, CON)
|
||||
get bob ##> ("", aliceId, CON)
|
||||
-- register notification token, set mode to NMPeriodic
|
||||
let tkn = DeviceToken PPApns "abcd"
|
||||
NTRegistered <- registerNtfToken alice tkn NMPeriodic
|
||||
APNSMockRequest {notification = APNSNotification {aps = APNSBackground _, notificationData = Just ntfData}, sendApnsResponse} <-
|
||||
atomically $ readTBQueue apnsQ
|
||||
verification <- ntfData .-> "verification"
|
||||
verificationNonce <- C.cbNonce <$> ntfData .-> "nonce"
|
||||
liftIO $ sendApnsResponse APNSRespOk
|
||||
verifyNtfToken alice tkn verification verificationNonce
|
||||
NTActive <- checkNtfToken alice tkn
|
||||
-- send message, no notification
|
||||
1 <- msgId <$> sendMessage bob aliceId (SMP.MsgFlags True) "hello"
|
||||
get bob ##> ("", aliceId, SENT $ baseId + 1)
|
||||
get alice =##> \case ("", c, Msg "hello") -> c == bobId; _ -> False
|
||||
ackMessage alice bobId $ baseId + 1
|
||||
-- set mode to NMInstant
|
||||
NTActive <- registerNtfToken alice tkn NMInstant
|
||||
-- send message, receive notification
|
||||
liftIO $ threadDelay 500000
|
||||
2 <- msgId <$> sendMessage bob aliceId (SMP.MsgFlags True) "hello again"
|
||||
get bob ##> ("", aliceId, SENT $ baseId + 2)
|
||||
void $ messageNotification apnsQ
|
||||
get alice =##> \case ("", c, Msg "hello again") -> c == bobId; _ -> False
|
||||
ackMessage alice bobId $ baseId + 2
|
||||
-- reset mode to NMPeriodic
|
||||
NTActive <- registerNtfToken alice tkn NMPeriodic
|
||||
-- send message, no notification
|
||||
liftIO $ threadDelay 500000
|
||||
3 <- msgId <$> sendMessage bob aliceId (SMP.MsgFlags True) "hello there"
|
||||
get bob ##> ("", aliceId, SENT $ baseId + 3)
|
||||
get alice =##> \case ("", c, Msg "hello there") -> c == bobId; _ -> False
|
||||
ackMessage alice bobId $ baseId + 3
|
||||
-- no notifications should follow
|
||||
500000 `timeout` atomically (readTBQueue apnsQ) >>= \case
|
||||
Nothing -> pure ()
|
||||
_ -> error "unexpected notification"
|
||||
pure ()
|
||||
where
|
||||
baseId = 3
|
||||
msgId = subtract baseId
|
||||
|
||||
messageNotification :: TBQueue APNSMockRequest -> ExceptT AgentErrorType IO (C.CbNonce, ByteString)
|
||||
messageNotification apnsQ = do
|
||||
500000 `timeout` atomically (readTBQueue apnsQ) >>= \case
|
||||
|
||||
Reference in New Issue
Block a user