mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-06-04 00:01:27 +00:00
skip ndel
This commit is contained in:
@@ -1018,7 +1018,7 @@ subscribeConnections' c connIds = do
|
||||
let oks = M.keysSet $ M.filter (either temporaryAgentError $ const True) rcvRs
|
||||
forM_ (M.restrictKeys cs oks) $ \case
|
||||
SomeConn _ conn -> do
|
||||
let cmd = if enableNtfs $ toConnData conn then NSCCreate else NSCDelete
|
||||
let cmd = if enableNtfs $ toConnData conn then NSCCreate else NSCSmpDelete
|
||||
ConnData {connId} = toConnData conn
|
||||
atomically $ writeTBQueue (ntfSubQ ns) (connId, cmd)
|
||||
resumeDelivery :: Map ConnId SomeConn -> AM ()
|
||||
@@ -1720,7 +1720,7 @@ disableConn :: AgentClient -> ConnId -> AM' ()
|
||||
disableConn c connId = do
|
||||
atomically $ removeSubscription c connId
|
||||
ns <- asks ntfSupervisor
|
||||
atomically $ writeTBQueue (ntfSubQ ns) (connId, NSCDelete)
|
||||
atomically $ writeTBQueue (ntfSubQ ns) (connId, NSCSmpDelete)
|
||||
|
||||
-- Unlike deleteConnectionsAsync, this function does not mark connections as deleted in case of deletion failure.
|
||||
deleteConnections' :: AgentClient -> [ConnId] -> AM (Map ConnId (Either AgentErrorType ()))
|
||||
@@ -1902,7 +1902,7 @@ registerNtfToken' c suppliedDeviceToken suppliedNtfMode =
|
||||
cron <- asks $ ntfCron . config
|
||||
agentNtfEnableCron c tknId tkn cron
|
||||
when (suppliedNtfMode == NMInstant) $ initializeNtfSubs c
|
||||
when (suppliedNtfMode == NMPeriodic && savedNtfMode == NMInstant) $ deleteNtfSubs c NSCDelete
|
||||
when (suppliedNtfMode == NMPeriodic && savedNtfMode == NMInstant) $ deleteNtfSubs c NSCSmpDelete
|
||||
-- possible improvement: get updated token status from the server, or maybe TCRON could return the current status
|
||||
pure ntfTknStatus
|
||||
| otherwise -> replaceToken tknId
|
||||
@@ -2017,7 +2017,7 @@ toggleConnectionNtfs' c connId enable = do
|
||||
| otherwise = do
|
||||
withStore' c $ \db -> setConnectionNtfs db connId enable
|
||||
ns <- asks ntfSupervisor
|
||||
let cmd = if enable then NSCCreate else NSCDelete
|
||||
let cmd = if enable then NSCCreate else NSCSmpDelete
|
||||
atomically $ sendNtfSubCommand ns (connId, cmd)
|
||||
|
||||
deleteToken_ :: AgentClient -> NtfToken -> AM ()
|
||||
|
||||
@@ -259,7 +259,7 @@ data NtfSupervisor = NtfSupervisor
|
||||
ntfSMPWorkers :: TMap SMPServer Worker
|
||||
}
|
||||
|
||||
data NtfSupervisorCommand = NSCCreate | NSCDelete | NSCSmpDelete | NSCNtfWorker NtfServer | NSCNtfSMPWorker SMPServer
|
||||
data NtfSupervisorCommand = NSCCreate | NSCSmpDelete | NSCNtfWorker NtfServer | NSCNtfSMPWorker SMPServer
|
||||
deriving (Show)
|
||||
|
||||
newNtfSubSupervisor :: Natural -> IO NtfSupervisor
|
||||
|
||||
@@ -76,13 +76,13 @@ processNtfSub c (connId, cmd) = do
|
||||
let newSub = newNtfSubscription userId connId smpServer Nothing ntfServer NASNew
|
||||
withStore c $ \db -> createNtfSubscription db newSub $ NSASMP NSASmpKey
|
||||
lift . void $ getNtfSMPWorker True c smpServer
|
||||
(Just (sub@NtfSubscription {ntfSubStatus, ntfServer = subNtfServer, smpServer = smpServer', ntfQueueId}, action_)) -> do
|
||||
(Just (sub@NtfSubscription {ntfServer = subNtfServer, smpServer = smpServer', ntfQueueId}, action_)) -> do
|
||||
case (clientNtfCreds, ntfQueueId) of
|
||||
(Just ClientNtfCreds {notifierId}, Just ntfQueueId')
|
||||
| sameSrvAddr smpServer smpServer' && notifierId == ntfQueueId' -> create
|
||||
| otherwise -> rotate
|
||||
| otherwise -> resetSubscription
|
||||
(Nothing, Nothing) -> create
|
||||
_ -> rotate
|
||||
_ -> resetSubscription
|
||||
where
|
||||
create :: AM ()
|
||||
create = case action_ of
|
||||
@@ -90,33 +90,17 @@ processNtfSub c (connId, cmd) = do
|
||||
Nothing -> resetSubscription
|
||||
Just (action, _)
|
||||
-- subscription was marked for deletion / is being deleted
|
||||
| isDeleteNtfSubAction action -> do
|
||||
if ntfSubStatus == NASNew || ntfSubStatus == NASOff || ntfSubStatus == NASDeleted
|
||||
then resetSubscription
|
||||
else withTokenServer $ \ntfServer -> do
|
||||
withStore' c $ \db -> supervisorUpdateNtfSub db sub {ntfServer} (NSANtf NSACreate)
|
||||
lift . void $ getNtfNTFWorker True c ntfServer
|
||||
| isDeleteNtfSubAction action -> resetSubscription
|
||||
-- continue work on subscription (e.g. supervisor was repeatedly tasked with creating a subscription)
|
||||
| otherwise -> case action of
|
||||
NSANtf _ -> lift . void $ getNtfNTFWorker True c subNtfServer
|
||||
NSASMP _ -> lift . void $ getNtfSMPWorker True c smpServer
|
||||
rotate :: AM ()
|
||||
rotate = do
|
||||
withStore' c $ \db -> supervisorUpdateNtfSub db sub (NSANtf NSARotate)
|
||||
lift . void $ getNtfNTFWorker True c subNtfServer
|
||||
resetSubscription :: AM ()
|
||||
resetSubscription =
|
||||
withTokenServer $ \ntfServer -> do
|
||||
let sub' = sub {ntfQueueId = Nothing, ntfServer, ntfSubId = Nothing, ntfSubStatus = NASNew}
|
||||
let sub' = sub {smpServer, ntfQueueId = Nothing, ntfServer, ntfSubId = Nothing, ntfSubStatus = NASNew}
|
||||
withStore' c $ \db -> supervisorUpdateNtfSub db sub' (NSASMP NSASmpKey)
|
||||
lift . void $ getNtfSMPWorker True c smpServer
|
||||
NSCDelete -> do
|
||||
sub_ <- withStore' c $ \db -> do
|
||||
supervisorUpdateNtfAction db connId (NSANtf NSADelete)
|
||||
getNtfSubscription db connId
|
||||
logInfo $ "processNtfSub, NSCDelete - sub_ = " <> tshow sub_
|
||||
case sub_ of
|
||||
(Just (NtfSubscription {ntfServer}, _)) -> lift . void $ getNtfNTFWorker True c ntfServer
|
||||
_ -> pure () -- err "NSCDelete - no subscription"
|
||||
NSCSmpDelete -> do
|
||||
withStore' c (`getPrimaryRcvQueue` connId) >>= \case
|
||||
Right rq@RcvQueue {server = smpServer} -> do
|
||||
@@ -193,6 +177,7 @@ runNtfWorker c srv Worker {doWork} =
|
||||
atomically $ incNtfServerStat c userId ntfServer ntfChecked
|
||||
Nothing -> workerInternalError c connId "NSACheck - no subscription ID"
|
||||
_ -> workerInternalError c connId "NSACheck - no active token"
|
||||
-- NSADelete and NSARotate are deprecated, but their processing is kept for legacy db records
|
||||
NSADelete ->
|
||||
deleteNtfSub $ do
|
||||
let sub' = sub {ntfSubId = Nothing, ntfSubStatus = NASOff}
|
||||
@@ -205,6 +190,7 @@ runNtfWorker c srv Worker {doWork} =
|
||||
ns <- asks ntfSupervisor
|
||||
atomically $ writeTBQueue (ntfSubQ ns) (connId, NSCCreate)
|
||||
where
|
||||
-- deleteNtfSub is only used in NSADelete and NSARotate, so also deprecated
|
||||
deleteNtfSub continue = case ntfSubId of
|
||||
Just nSubId ->
|
||||
lift getNtfToken >>= \case
|
||||
|
||||
@@ -1491,7 +1491,7 @@ getNtfSubscription db connId =
|
||||
|]
|
||||
(Only connId)
|
||||
where
|
||||
ntfSubscription ((userId, smpHost, smpPort, smpKeyHash, ntfHost, ntfPort, ntfKeyHash ) :. (ntfQueueId, ntfSubId, ntfSubStatus, ntfAction_, smpAction_, actionTs_)) =
|
||||
ntfSubscription ((userId, smpHost, smpPort, smpKeyHash, ntfHost, ntfPort, ntfKeyHash) :. (ntfQueueId, ntfSubId, ntfSubStatus, ntfAction_, smpAction_, actionTs_)) =
|
||||
let smpServer = SMPServer smpHost smpPort smpKeyHash
|
||||
ntfServer = NtfServer ntfHost ntfPort ntfKeyHash
|
||||
action = case (ntfAction_, smpAction_, actionTs_) of
|
||||
@@ -1521,16 +1521,19 @@ createNtfSubscription db ntfSubscription action = runExceptT $ do
|
||||
(ntfSubAction, ntfSubSMPAction) = ntfSubAndSMPAction action
|
||||
|
||||
supervisorUpdateNtfSub :: DB.Connection -> NtfSubscription -> NtfSubAction -> IO ()
|
||||
supervisorUpdateNtfSub db NtfSubscription {connId, ntfQueueId, ntfServer = (NtfServer ntfHost ntfPort _), ntfSubId, ntfSubStatus} action = do
|
||||
supervisorUpdateNtfSub db NtfSubscription {connId, smpServer = (SMPServer smpHost smpPort _), ntfQueueId, ntfServer = (NtfServer ntfHost ntfPort _), ntfSubId, ntfSubStatus} action = do
|
||||
ts <- getCurrentTime
|
||||
DB.execute
|
||||
db
|
||||
[sql|
|
||||
UPDATE ntf_subscriptions
|
||||
SET smp_ntf_id = ?, ntf_host = ?, ntf_port = ?, ntf_sub_id = ?, ntf_sub_status = ?, ntf_sub_action = ?, ntf_sub_smp_action = ?, ntf_sub_action_ts = ?, updated_by_supervisor = ?, updated_at = ?
|
||||
SET smp_host = ?, smp_port = ?, smp_ntf_id = ?, ntf_host = ?, ntf_port = ?, ntf_sub_id = ?,
|
||||
ntf_sub_status = ?, ntf_sub_action = ?, ntf_sub_smp_action = ?, ntf_sub_action_ts = ?, updated_by_supervisor = ?, updated_at = ?
|
||||
WHERE conn_id = ?
|
||||
|]
|
||||
((ntfQueueId, ntfHost, ntfPort, ntfSubId) :. (ntfSubStatus, ntfSubAction, ntfSubSMPAction, ts, True, ts, connId))
|
||||
( (smpHost, smpPort, ntfQueueId, ntfHost, ntfPort, ntfSubId)
|
||||
:. (ntfSubStatus, ntfSubAction, ntfSubSMPAction, ts, True, ts, connId)
|
||||
)
|
||||
where
|
||||
(ntfSubAction, ntfSubSMPAction) = ntfSubAndSMPAction action
|
||||
|
||||
@@ -2004,7 +2007,7 @@ setConnDeleted db waitDelivery connId
|
||||
DB.execute db "UPDATE connections SET deleted = ? WHERE conn_id = ?" (True, connId)
|
||||
|
||||
setConnUserId :: DB.Connection -> UserId -> ConnId -> UserId -> IO ()
|
||||
setConnUserId db oldUserId connId newUserId =
|
||||
setConnUserId db oldUserId connId newUserId =
|
||||
DB.execute db "UPDATE connections SET user_id = ? WHERE conn_id = ? and user_id = ?" (newUserId, connId, oldUserId)
|
||||
|
||||
setConnAgentVersion :: DB.Connection -> ConnId -> VersionSMPA -> IO ()
|
||||
|
||||
@@ -99,8 +99,8 @@ type NtfActionTs = UTCTime
|
||||
data NtfSubNTFAction
|
||||
= NSACreate
|
||||
| NSACheck
|
||||
| NSADelete
|
||||
| NSARotate
|
||||
| NSADelete -- deprecated
|
||||
| NSARotate -- deprecated
|
||||
deriving (Show)
|
||||
|
||||
instance Encoding NtfSubNTFAction where
|
||||
|
||||
+1
-1
@@ -24,6 +24,6 @@ agentTests (ATransport t) = do
|
||||
describe "Connection request" connectionRequestTests
|
||||
describe "Double ratchet tests" doubleRatchetTests
|
||||
describe "Functional API" $ functionalAPITests (ATransport t)
|
||||
describe "Notification tests" $ notificationTests (ATransport t)
|
||||
fdescribe "Notification tests" $ notificationTests (ATransport t)
|
||||
describe "SQLite store" storeTests
|
||||
describe "Migration tests" migrationTests
|
||||
|
||||
+1
-1
@@ -56,7 +56,7 @@ main = do
|
||||
describe "Util tests" utilTests
|
||||
describe "SMP server via TLS" $ serverTests (transport @TLS)
|
||||
describe "SMP server via WebSockets" $ serverTests (transport @WS)
|
||||
describe "Notifications server" $ ntfServerTests (transport @TLS)
|
||||
fdescribe "Notifications server" $ ntfServerTests (transport @TLS)
|
||||
describe "SMP client agent" $ agentTests (transport @TLS)
|
||||
describe "SMP proxy" smpProxyTests
|
||||
describe "XFTP" $ do
|
||||
|
||||
Reference in New Issue
Block a user