agent: do not reuse notifier credentials when re-creating notification subscription; on deleting ntf sub delete directly on smp server skipping ntf server (#1311)

* agent: change notifier credentials when re-creating notification subscription

* skip ndel

* enable tests

* fix race

* Revert "fix race"

This reverts commit ed9b18e8a7.

* delete record

* rename

---------

Co-authored-by: Evgeny Poberezkin <evgeny@poberezkin.com>
This commit is contained in:
spaced4ndy
2024-09-12 14:02:51 +04:00
committed by GitHub
parent 896b2425a4
commit e247f691cb
5 changed files with 33 additions and 44 deletions

View File

@@ -1018,7 +1018,7 @@ subscribeConnections' c connIds = do
let oks = M.keysSet $ M.filter (either temporaryAgentError $ const True) rcvRs
forM_ (M.restrictKeys cs oks) $ \case
SomeConn _ conn -> do
let cmd = if enableNtfs $ toConnData conn then NSCCreate else NSCDelete
let cmd = if enableNtfs $ toConnData conn then NSCCreate else NSCSmpDelete
ConnData {connId} = toConnData conn
atomically $ writeTBQueue (ntfSubQ ns) (connId, cmd)
resumeDelivery :: Map ConnId SomeConn -> AM ()
@@ -1720,7 +1720,7 @@ disableConn :: AgentClient -> ConnId -> AM' ()
disableConn c connId = do
atomically $ removeSubscription c connId
ns <- asks ntfSupervisor
atomically $ writeTBQueue (ntfSubQ ns) (connId, NSCDelete)
atomically $ writeTBQueue (ntfSubQ ns) (connId, NSCDeleteSub)
-- Unlike deleteConnectionsAsync, this function does not mark connections as deleted in case of deletion failure.
deleteConnections' :: AgentClient -> [ConnId] -> AM (Map ConnId (Either AgentErrorType ()))
@@ -1902,7 +1902,7 @@ registerNtfToken' c suppliedDeviceToken suppliedNtfMode =
cron <- asks $ ntfCron . config
agentNtfEnableCron c tknId tkn cron
when (suppliedNtfMode == NMInstant) $ initializeNtfSubs c
when (suppliedNtfMode == NMPeriodic && savedNtfMode == NMInstant) $ deleteNtfSubs c NSCDelete
when (suppliedNtfMode == NMPeriodic && savedNtfMode == NMInstant) $ deleteNtfSubs c NSCSmpDelete
-- possible improvement: get updated token status from the server, or maybe TCRON could return the current status
pure ntfTknStatus
| otherwise -> replaceToken tknId
@@ -2017,7 +2017,7 @@ toggleConnectionNtfs' c connId enable = do
| otherwise = do
withStore' c $ \db -> setConnectionNtfs db connId enable
ns <- asks ntfSupervisor
let cmd = if enable then NSCCreate else NSCDelete
let cmd = if enable then NSCCreate else NSCSmpDelete
atomically $ sendNtfSubCommand ns (connId, cmd)
deleteToken_ :: AgentClient -> NtfToken -> AM ()

View File

@@ -259,7 +259,7 @@ data NtfSupervisor = NtfSupervisor
ntfSMPWorkers :: TMap SMPServer Worker
}
data NtfSupervisorCommand = NSCCreate | NSCDelete | NSCSmpDelete | NSCNtfWorker NtfServer | NSCNtfSMPWorker SMPServer
data NtfSupervisorCommand = NSCCreate | NSCSmpDelete | NSCNtfWorker NtfServer | NSCNtfSMPWorker SMPServer | NSCDeleteSub
deriving (Show)
newNtfSubSupervisor :: Natural -> IO NtfSupervisor

View File

@@ -73,22 +73,16 @@ processNtfSub c (connId, cmd) = do
case a of
Nothing -> do
withTokenServer $ \ntfServer -> do
case clientNtfCreds of
Just ClientNtfCreds {notifierId} -> do
let newSub = newNtfSubscription userId connId smpServer (Just notifierId) ntfServer NASKey
withStore c $ \db -> createNtfSubscription db newSub $ NSANtf NSACreate
lift . void $ getNtfNTFWorker True c ntfServer
Nothing -> do
let newSub = newNtfSubscription userId connId smpServer Nothing ntfServer NASNew
withStore c $ \db -> createNtfSubscription db newSub $ NSASMP NSASmpKey
lift . void $ getNtfSMPWorker True c smpServer
(Just (sub@NtfSubscription {ntfSubStatus, ntfServer = subNtfServer, smpServer = smpServer', ntfQueueId}, action_)) -> do
let newSub = newNtfSubscription userId connId smpServer Nothing ntfServer NASNew
withStore c $ \db -> createNtfSubscription db newSub $ NSASMP NSASmpKey
lift . void $ getNtfSMPWorker True c smpServer
(Just (sub@NtfSubscription {ntfServer = subNtfServer, smpServer = smpServer', ntfQueueId}, action_)) -> do
case (clientNtfCreds, ntfQueueId) of
(Just ClientNtfCreds {notifierId}, Just ntfQueueId')
| sameSrvAddr smpServer smpServer' && notifierId == ntfQueueId' -> create
| otherwise -> rotate
| otherwise -> resetSubscription
(Nothing, Nothing) -> create
_ -> rotate
_ -> resetSubscription
where
create :: AM ()
create = case action_ of
@@ -96,33 +90,17 @@ processNtfSub c (connId, cmd) = do
Nothing -> resetSubscription
Just (action, _)
-- subscription was marked for deletion / is being deleted
| isDeleteNtfSubAction action -> do
if ntfSubStatus == NASNew || ntfSubStatus == NASOff || ntfSubStatus == NASDeleted
then resetSubscription
else withTokenServer $ \ntfServer -> do
withStore' c $ \db -> supervisorUpdateNtfSub db sub {ntfServer} (NSANtf NSACreate)
lift . void $ getNtfNTFWorker True c ntfServer
| isDeleteNtfSubAction action -> resetSubscription
-- continue work on subscription (e.g. supervisor was repeatedly tasked with creating a subscription)
| otherwise -> case action of
NSANtf _ -> lift . void $ getNtfNTFWorker True c subNtfServer
NSASMP _ -> lift . void $ getNtfSMPWorker True c smpServer
rotate :: AM ()
rotate = do
withStore' c $ \db -> supervisorUpdateNtfSub db sub (NSANtf NSARotate)
lift . void $ getNtfNTFWorker True c subNtfServer
resetSubscription :: AM ()
resetSubscription =
withTokenServer $ \ntfServer -> do
let sub' = sub {ntfQueueId = Nothing, ntfServer, ntfSubId = Nothing, ntfSubStatus = NASNew}
let sub' = sub {smpServer, ntfQueueId = Nothing, ntfServer, ntfSubId = Nothing, ntfSubStatus = NASNew}
withStore' c $ \db -> supervisorUpdateNtfSub db sub' (NSASMP NSASmpKey)
lift . void $ getNtfSMPWorker True c smpServer
NSCDelete -> do
sub_ <- withStore' c $ \db -> do
supervisorUpdateNtfAction db connId (NSANtf NSADelete)
getNtfSubscription db connId
logInfo $ "processNtfSub, NSCDelete - sub_ = " <> tshow sub_
case sub_ of
(Just (NtfSubscription {ntfServer}, _)) -> lift . void $ getNtfNTFWorker True c ntfServer
_ -> pure () -- err "NSCDelete - no subscription"
NSCSmpDelete -> do
withStore' c (`getPrimaryRcvQueue` connId) >>= \case
Right rq@RcvQueue {server = smpServer} -> do
@@ -132,6 +110,7 @@ processNtfSub c (connId, cmd) = do
_ -> notifyInternalError c connId "NSCSmpDelete - no rcv queue"
NSCNtfWorker ntfServer -> lift . void $ getNtfNTFWorker True c ntfServer
NSCNtfSMPWorker smpServer -> lift . void $ getNtfSMPWorker True c smpServer
NSCDeleteSub -> withStore' c $ \db -> deleteNtfSubscription' db connId
getNtfNTFWorker :: Bool -> AgentClient -> NtfServer -> AM' Worker
getNtfNTFWorker hasWork c server = do
@@ -199,6 +178,7 @@ runNtfWorker c srv Worker {doWork} =
atomically $ incNtfServerStat c userId ntfServer ntfChecked
Nothing -> workerInternalError c connId "NSACheck - no subscription ID"
_ -> workerInternalError c connId "NSACheck - no active token"
-- NSADelete and NSARotate are deprecated, but their processing is kept for legacy db records
NSADelete ->
deleteNtfSub $ do
let sub' = sub {ntfSubId = Nothing, ntfSubStatus = NASOff}
@@ -211,6 +191,7 @@ runNtfWorker c srv Worker {doWork} =
ns <- asks ntfSupervisor
atomically $ writeTBQueue (ntfSubQ ns) (connId, NSCCreate)
where
-- deleteNtfSub is only used in NSADelete and NSARotate, so also deprecated
deleteNtfSub continue = case ntfSubId of
Just nSubId ->
lift getNtfToken >>= \case

View File

@@ -158,6 +158,7 @@ module Simplex.Messaging.Agent.Store.SQLite
updateNtfSubscription,
setNullNtfSubscriptionAction,
deleteNtfSubscription,
deleteNtfSubscription',
getNextNtfSubNTFAction,
markNtfSubActionNtfFailed_, -- exported for tests
getNextNtfSubSMPAction,
@@ -1491,7 +1492,7 @@ getNtfSubscription db connId =
|]
(Only connId)
where
ntfSubscription ((userId, smpHost, smpPort, smpKeyHash, ntfHost, ntfPort, ntfKeyHash ) :. (ntfQueueId, ntfSubId, ntfSubStatus, ntfAction_, smpAction_, actionTs_)) =
ntfSubscription ((userId, smpHost, smpPort, smpKeyHash, ntfHost, ntfPort, ntfKeyHash) :. (ntfQueueId, ntfSubId, ntfSubStatus, ntfAction_, smpAction_, actionTs_)) =
let smpServer = SMPServer smpHost smpPort smpKeyHash
ntfServer = NtfServer ntfHost ntfPort ntfKeyHash
action = case (ntfAction_, smpAction_, actionTs_) of
@@ -1521,16 +1522,19 @@ createNtfSubscription db ntfSubscription action = runExceptT $ do
(ntfSubAction, ntfSubSMPAction) = ntfSubAndSMPAction action
supervisorUpdateNtfSub :: DB.Connection -> NtfSubscription -> NtfSubAction -> IO ()
supervisorUpdateNtfSub db NtfSubscription {connId, ntfQueueId, ntfServer = (NtfServer ntfHost ntfPort _), ntfSubId, ntfSubStatus} action = do
supervisorUpdateNtfSub db NtfSubscription {connId, smpServer = (SMPServer smpHost smpPort _), ntfQueueId, ntfServer = (NtfServer ntfHost ntfPort _), ntfSubId, ntfSubStatus} action = do
ts <- getCurrentTime
DB.execute
db
[sql|
UPDATE ntf_subscriptions
SET smp_ntf_id = ?, ntf_host = ?, ntf_port = ?, ntf_sub_id = ?, ntf_sub_status = ?, ntf_sub_action = ?, ntf_sub_smp_action = ?, ntf_sub_action_ts = ?, updated_by_supervisor = ?, updated_at = ?
SET smp_host = ?, smp_port = ?, smp_ntf_id = ?, ntf_host = ?, ntf_port = ?, ntf_sub_id = ?,
ntf_sub_status = ?, ntf_sub_action = ?, ntf_sub_smp_action = ?, ntf_sub_action_ts = ?, updated_by_supervisor = ?, updated_at = ?
WHERE conn_id = ?
|]
((ntfQueueId, ntfHost, ntfPort, ntfSubId) :. (ntfSubStatus, ntfSubAction, ntfSubSMPAction, ts, True, ts, connId))
( (smpHost, smpPort, ntfQueueId, ntfHost, ntfPort, ntfSubId)
:. (ntfSubStatus, ntfSubAction, ntfSubSMPAction, ts, True, ts, connId)
)
where
(ntfSubAction, ntfSubSMPAction) = ntfSubAndSMPAction action
@@ -1605,7 +1609,11 @@ deleteNtfSubscription db connId = do
WHERE conn_id = ?
|]
(Nothing :: Maybe SMP.NotifierId, Nothing :: Maybe NtfSubscriptionId, NASDeleted, False, updatedAt, connId)
else DB.execute db "DELETE FROM ntf_subscriptions WHERE conn_id = ?" (Only connId)
else deleteNtfSubscription' db connId
deleteNtfSubscription' :: DB.Connection -> ConnId -> IO ()
deleteNtfSubscription' db connId = do
DB.execute db "DELETE FROM ntf_subscriptions WHERE conn_id = ?" (Only connId)
getNextNtfSubNTFAction :: DB.Connection -> NtfServer -> IO (Either StoreError (Maybe (NtfSubscription, NtfSubNTFAction, NtfActionTs)))
getNextNtfSubNTFAction db ntfServer@(NtfServer ntfHost ntfPort _) =
@@ -2004,7 +2012,7 @@ setConnDeleted db waitDelivery connId
DB.execute db "UPDATE connections SET deleted = ? WHERE conn_id = ?" (True, connId)
setConnUserId :: DB.Connection -> UserId -> ConnId -> UserId -> IO ()
setConnUserId db oldUserId connId newUserId =
setConnUserId db oldUserId connId newUserId =
DB.execute db "UPDATE connections SET user_id = ? WHERE conn_id = ? and user_id = ?" (newUserId, connId, oldUserId)
setConnAgentVersion :: DB.Connection -> ConnId -> VersionSMPA -> IO ()

View File

@@ -99,8 +99,8 @@ type NtfActionTs = UTCTime
data NtfSubNTFAction
= NSACreate
| NSACheck
| NSADelete
| NSARotate
| NSADelete -- deprecated
| NSARotate -- deprecated
deriving (Show)
instance Encoding NtfSubNTFAction where