diff --git a/src/Simplex/Messaging/Agent.hs b/src/Simplex/Messaging/Agent.hs index 1dc86d081..1a3565f2f 100644 --- a/src/Simplex/Messaging/Agent.hs +++ b/src/Simplex/Messaging/Agent.hs @@ -1018,7 +1018,7 @@ subscribeConnections' c connIds = do let oks = M.keysSet $ M.filter (either temporaryAgentError $ const True) rcvRs forM_ (M.restrictKeys cs oks) $ \case SomeConn _ conn -> do - let cmd = if enableNtfs $ toConnData conn then NSCCreate else NSCDelete + let cmd = if enableNtfs $ toConnData conn then NSCCreate else NSCSmpDelete ConnData {connId} = toConnData conn atomically $ writeTBQueue (ntfSubQ ns) (connId, cmd) resumeDelivery :: Map ConnId SomeConn -> AM () @@ -1720,7 +1720,7 @@ disableConn :: AgentClient -> ConnId -> AM' () disableConn c connId = do atomically $ removeSubscription c connId ns <- asks ntfSupervisor - atomically $ writeTBQueue (ntfSubQ ns) (connId, NSCDelete) + atomically $ writeTBQueue (ntfSubQ ns) (connId, NSCDeleteSub) -- Unlike deleteConnectionsAsync, this function does not mark connections as deleted in case of deletion failure. deleteConnections' :: AgentClient -> [ConnId] -> AM (Map ConnId (Either AgentErrorType ())) @@ -1902,7 +1902,7 @@ registerNtfToken' c suppliedDeviceToken suppliedNtfMode = cron <- asks $ ntfCron . config agentNtfEnableCron c tknId tkn cron when (suppliedNtfMode == NMInstant) $ initializeNtfSubs c - when (suppliedNtfMode == NMPeriodic && savedNtfMode == NMInstant) $ deleteNtfSubs c NSCDelete + when (suppliedNtfMode == NMPeriodic && savedNtfMode == NMInstant) $ deleteNtfSubs c NSCSmpDelete -- possible improvement: get updated token status from the server, or maybe TCRON could return the current status pure ntfTknStatus | otherwise -> replaceToken tknId @@ -2017,7 +2017,7 @@ toggleConnectionNtfs' c connId enable = do | otherwise = do withStore' c $ \db -> setConnectionNtfs db connId enable ns <- asks ntfSupervisor - let cmd = if enable then NSCCreate else NSCDelete + let cmd = if enable then NSCCreate else NSCSmpDelete atomically $ sendNtfSubCommand ns (connId, cmd) deleteToken_ :: AgentClient -> NtfToken -> AM () diff --git a/src/Simplex/Messaging/Agent/Env/SQLite.hs b/src/Simplex/Messaging/Agent/Env/SQLite.hs index bc5e800c5..c10ba91ca 100644 --- a/src/Simplex/Messaging/Agent/Env/SQLite.hs +++ b/src/Simplex/Messaging/Agent/Env/SQLite.hs @@ -259,7 +259,7 @@ data NtfSupervisor = NtfSupervisor ntfSMPWorkers :: TMap SMPServer Worker } -data NtfSupervisorCommand = NSCCreate | NSCDelete | NSCSmpDelete | NSCNtfWorker NtfServer | NSCNtfSMPWorker SMPServer +data NtfSupervisorCommand = NSCCreate | NSCSmpDelete | NSCNtfWorker NtfServer | NSCNtfSMPWorker SMPServer | NSCDeleteSub deriving (Show) newNtfSubSupervisor :: Natural -> IO NtfSupervisor diff --git a/src/Simplex/Messaging/Agent/NtfSubSupervisor.hs b/src/Simplex/Messaging/Agent/NtfSubSupervisor.hs index 23a88ea70..fa6291a00 100644 --- a/src/Simplex/Messaging/Agent/NtfSubSupervisor.hs +++ b/src/Simplex/Messaging/Agent/NtfSubSupervisor.hs @@ -73,22 +73,16 @@ processNtfSub c (connId, cmd) = do case a of Nothing -> do withTokenServer $ \ntfServer -> do - case clientNtfCreds of - Just ClientNtfCreds {notifierId} -> do - let newSub = newNtfSubscription userId connId smpServer (Just notifierId) ntfServer NASKey - withStore c $ \db -> createNtfSubscription db newSub $ NSANtf NSACreate - lift . void $ getNtfNTFWorker True c ntfServer - Nothing -> do - let newSub = newNtfSubscription userId connId smpServer Nothing ntfServer NASNew - withStore c $ \db -> createNtfSubscription db newSub $ NSASMP NSASmpKey - lift . void $ getNtfSMPWorker True c smpServer - (Just (sub@NtfSubscription {ntfSubStatus, ntfServer = subNtfServer, smpServer = smpServer', ntfQueueId}, action_)) -> do + let newSub = newNtfSubscription userId connId smpServer Nothing ntfServer NASNew + withStore c $ \db -> createNtfSubscription db newSub $ NSASMP NSASmpKey + lift . void $ getNtfSMPWorker True c smpServer + (Just (sub@NtfSubscription {ntfServer = subNtfServer, smpServer = smpServer', ntfQueueId}, action_)) -> do case (clientNtfCreds, ntfQueueId) of (Just ClientNtfCreds {notifierId}, Just ntfQueueId') | sameSrvAddr smpServer smpServer' && notifierId == ntfQueueId' -> create - | otherwise -> rotate + | otherwise -> resetSubscription (Nothing, Nothing) -> create - _ -> rotate + _ -> resetSubscription where create :: AM () create = case action_ of @@ -96,33 +90,17 @@ processNtfSub c (connId, cmd) = do Nothing -> resetSubscription Just (action, _) -- subscription was marked for deletion / is being deleted - | isDeleteNtfSubAction action -> do - if ntfSubStatus == NASNew || ntfSubStatus == NASOff || ntfSubStatus == NASDeleted - then resetSubscription - else withTokenServer $ \ntfServer -> do - withStore' c $ \db -> supervisorUpdateNtfSub db sub {ntfServer} (NSANtf NSACreate) - lift . void $ getNtfNTFWorker True c ntfServer + | isDeleteNtfSubAction action -> resetSubscription + -- continue work on subscription (e.g. supervisor was repeatedly tasked with creating a subscription) | otherwise -> case action of NSANtf _ -> lift . void $ getNtfNTFWorker True c subNtfServer NSASMP _ -> lift . void $ getNtfSMPWorker True c smpServer - rotate :: AM () - rotate = do - withStore' c $ \db -> supervisorUpdateNtfSub db sub (NSANtf NSARotate) - lift . void $ getNtfNTFWorker True c subNtfServer resetSubscription :: AM () resetSubscription = withTokenServer $ \ntfServer -> do - let sub' = sub {ntfQueueId = Nothing, ntfServer, ntfSubId = Nothing, ntfSubStatus = NASNew} + let sub' = sub {smpServer, ntfQueueId = Nothing, ntfServer, ntfSubId = Nothing, ntfSubStatus = NASNew} withStore' c $ \db -> supervisorUpdateNtfSub db sub' (NSASMP NSASmpKey) lift . void $ getNtfSMPWorker True c smpServer - NSCDelete -> do - sub_ <- withStore' c $ \db -> do - supervisorUpdateNtfAction db connId (NSANtf NSADelete) - getNtfSubscription db connId - logInfo $ "processNtfSub, NSCDelete - sub_ = " <> tshow sub_ - case sub_ of - (Just (NtfSubscription {ntfServer}, _)) -> lift . void $ getNtfNTFWorker True c ntfServer - _ -> pure () -- err "NSCDelete - no subscription" NSCSmpDelete -> do withStore' c (`getPrimaryRcvQueue` connId) >>= \case Right rq@RcvQueue {server = smpServer} -> do @@ -132,6 +110,7 @@ processNtfSub c (connId, cmd) = do _ -> notifyInternalError c connId "NSCSmpDelete - no rcv queue" NSCNtfWorker ntfServer -> lift . void $ getNtfNTFWorker True c ntfServer NSCNtfSMPWorker smpServer -> lift . void $ getNtfSMPWorker True c smpServer + NSCDeleteSub -> withStore' c $ \db -> deleteNtfSubscription' db connId getNtfNTFWorker :: Bool -> AgentClient -> NtfServer -> AM' Worker getNtfNTFWorker hasWork c server = do @@ -199,6 +178,7 @@ runNtfWorker c srv Worker {doWork} = atomically $ incNtfServerStat c userId ntfServer ntfChecked Nothing -> workerInternalError c connId "NSACheck - no subscription ID" _ -> workerInternalError c connId "NSACheck - no active token" + -- NSADelete and NSARotate are deprecated, but their processing is kept for legacy db records NSADelete -> deleteNtfSub $ do let sub' = sub {ntfSubId = Nothing, ntfSubStatus = NASOff} @@ -211,6 +191,7 @@ runNtfWorker c srv Worker {doWork} = ns <- asks ntfSupervisor atomically $ writeTBQueue (ntfSubQ ns) (connId, NSCCreate) where + -- deleteNtfSub is only used in NSADelete and NSARotate, so also deprecated deleteNtfSub continue = case ntfSubId of Just nSubId -> lift getNtfToken >>= \case diff --git a/src/Simplex/Messaging/Agent/Store/SQLite.hs b/src/Simplex/Messaging/Agent/Store/SQLite.hs index d3eae9354..f354ad530 100644 --- a/src/Simplex/Messaging/Agent/Store/SQLite.hs +++ b/src/Simplex/Messaging/Agent/Store/SQLite.hs @@ -158,6 +158,7 @@ module Simplex.Messaging.Agent.Store.SQLite updateNtfSubscription, setNullNtfSubscriptionAction, deleteNtfSubscription, + deleteNtfSubscription', getNextNtfSubNTFAction, markNtfSubActionNtfFailed_, -- exported for tests getNextNtfSubSMPAction, @@ -1491,7 +1492,7 @@ getNtfSubscription db connId = |] (Only connId) where - ntfSubscription ((userId, smpHost, smpPort, smpKeyHash, ntfHost, ntfPort, ntfKeyHash ) :. (ntfQueueId, ntfSubId, ntfSubStatus, ntfAction_, smpAction_, actionTs_)) = + ntfSubscription ((userId, smpHost, smpPort, smpKeyHash, ntfHost, ntfPort, ntfKeyHash) :. (ntfQueueId, ntfSubId, ntfSubStatus, ntfAction_, smpAction_, actionTs_)) = let smpServer = SMPServer smpHost smpPort smpKeyHash ntfServer = NtfServer ntfHost ntfPort ntfKeyHash action = case (ntfAction_, smpAction_, actionTs_) of @@ -1521,16 +1522,19 @@ createNtfSubscription db ntfSubscription action = runExceptT $ do (ntfSubAction, ntfSubSMPAction) = ntfSubAndSMPAction action supervisorUpdateNtfSub :: DB.Connection -> NtfSubscription -> NtfSubAction -> IO () -supervisorUpdateNtfSub db NtfSubscription {connId, ntfQueueId, ntfServer = (NtfServer ntfHost ntfPort _), ntfSubId, ntfSubStatus} action = do +supervisorUpdateNtfSub db NtfSubscription {connId, smpServer = (SMPServer smpHost smpPort _), ntfQueueId, ntfServer = (NtfServer ntfHost ntfPort _), ntfSubId, ntfSubStatus} action = do ts <- getCurrentTime DB.execute db [sql| UPDATE ntf_subscriptions - SET smp_ntf_id = ?, ntf_host = ?, ntf_port = ?, ntf_sub_id = ?, ntf_sub_status = ?, ntf_sub_action = ?, ntf_sub_smp_action = ?, ntf_sub_action_ts = ?, updated_by_supervisor = ?, updated_at = ? + SET smp_host = ?, smp_port = ?, smp_ntf_id = ?, ntf_host = ?, ntf_port = ?, ntf_sub_id = ?, + ntf_sub_status = ?, ntf_sub_action = ?, ntf_sub_smp_action = ?, ntf_sub_action_ts = ?, updated_by_supervisor = ?, updated_at = ? WHERE conn_id = ? |] - ((ntfQueueId, ntfHost, ntfPort, ntfSubId) :. (ntfSubStatus, ntfSubAction, ntfSubSMPAction, ts, True, ts, connId)) + ( (smpHost, smpPort, ntfQueueId, ntfHost, ntfPort, ntfSubId) + :. (ntfSubStatus, ntfSubAction, ntfSubSMPAction, ts, True, ts, connId) + ) where (ntfSubAction, ntfSubSMPAction) = ntfSubAndSMPAction action @@ -1605,7 +1609,11 @@ deleteNtfSubscription db connId = do WHERE conn_id = ? |] (Nothing :: Maybe SMP.NotifierId, Nothing :: Maybe NtfSubscriptionId, NASDeleted, False, updatedAt, connId) - else DB.execute db "DELETE FROM ntf_subscriptions WHERE conn_id = ?" (Only connId) + else deleteNtfSubscription' db connId + +deleteNtfSubscription' :: DB.Connection -> ConnId -> IO () +deleteNtfSubscription' db connId = do + DB.execute db "DELETE FROM ntf_subscriptions WHERE conn_id = ?" (Only connId) getNextNtfSubNTFAction :: DB.Connection -> NtfServer -> IO (Either StoreError (Maybe (NtfSubscription, NtfSubNTFAction, NtfActionTs))) getNextNtfSubNTFAction db ntfServer@(NtfServer ntfHost ntfPort _) = @@ -2004,7 +2012,7 @@ setConnDeleted db waitDelivery connId DB.execute db "UPDATE connections SET deleted = ? WHERE conn_id = ?" (True, connId) setConnUserId :: DB.Connection -> UserId -> ConnId -> UserId -> IO () -setConnUserId db oldUserId connId newUserId = +setConnUserId db oldUserId connId newUserId = DB.execute db "UPDATE connections SET user_id = ? WHERE conn_id = ? and user_id = ?" (newUserId, connId, oldUserId) setConnAgentVersion :: DB.Connection -> ConnId -> VersionSMPA -> IO () diff --git a/src/Simplex/Messaging/Notifications/Types.hs b/src/Simplex/Messaging/Notifications/Types.hs index 8fcedab53..774f354bb 100644 --- a/src/Simplex/Messaging/Notifications/Types.hs +++ b/src/Simplex/Messaging/Notifications/Types.hs @@ -99,8 +99,8 @@ type NtfActionTs = UTCTime data NtfSubNTFAction = NSACreate | NSACheck - | NSADelete - | NSARotate + | NSADelete -- deprecated + | NSARotate -- deprecated deriving (Show) instance Encoding NtfSubNTFAction where