diff --git a/src/Simplex/Messaging/Agent/Client.hs b/src/Simplex/Messaging/Agent/Client.hs index 81e60d5f2..cc7e38026 100644 --- a/src/Simplex/Messaging/Agent/Client.hs +++ b/src/Simplex/Messaging/Agent/Client.hs @@ -81,7 +81,6 @@ import Data.Maybe (catMaybes) import Data.Set (Set) import Data.Text.Encoding import Data.Word (Word16) -import Database.SQLite.Simple (SQLError) import qualified Database.SQLite.Simple as DB -- import GHC.Conc (unsafeIOToSTM) import Simplex.Messaging.Agent.Env.SQLite @@ -735,7 +734,7 @@ withStore c action = do atomically $ endAgentOperation c AODatabase liftEither $ first storeError r where - handleInternal :: SQLError -> IO (Either StoreError a) + handleInternal :: E.SomeException -> IO (Either StoreError a) handleInternal = pure . Left . SEInternal . bshow storeError :: StoreError -> AgentErrorType storeError = \case diff --git a/src/Simplex/Messaging/Agent/Env/SQLite.hs b/src/Simplex/Messaging/Agent/Env/SQLite.hs index f8199221b..469f543e8 100644 --- a/src/Simplex/Messaging/Agent/Env/SQLite.hs +++ b/src/Simplex/Messaging/Agent/Env/SQLite.hs @@ -29,6 +29,7 @@ import Control.Monad.Reader import Crypto.Random import Data.List.NonEmpty (NonEmpty) import Data.Time.Clock (NominalDiffTime, nominalDay) +import Data.Word (Word16) import Network.Socket import Numeric.Natural import Simplex.Messaging.Agent.Protocol @@ -46,7 +47,6 @@ import Simplex.Messaging.Version import System.Random (StdGen, newStdGen) import UnliftIO (Async) import UnliftIO.STM -import Data.Word (Word16) -- | Agent monad with MonadReader Env and MonadError AgentErrorType type AgentMonad m = (MonadUnliftIO m, MonadReader Env m, MonadError AgentErrorType m) @@ -142,6 +142,7 @@ data NtfSupervisor = NtfSupervisor } data NtfSupervisorCommand = NSCCreate | NSCDelete | NSCSmpDelete | NSCNtfWorker NtfServer | NSCNtfSMPWorker SMPServer + deriving (Show) newNtfSubSupervisor :: Natural -> STM NtfSupervisor newNtfSubSupervisor qSize = do diff --git a/src/Simplex/Messaging/Agent/NtfSubSupervisor.hs b/src/Simplex/Messaging/Agent/NtfSubSupervisor.hs index f6226fb7d..76911ef13 100644 --- a/src/Simplex/Messaging/Agent/NtfSubSupervisor.hs +++ b/src/Simplex/Messaging/Agent/NtfSubSupervisor.hs @@ -4,6 +4,7 @@ {-# LANGUAGE NamedFieldPuns #-} {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE ScopedTypeVariables #-} +{-# LANGUAGE TupleSections #-} module Simplex.Messaging.Agent.NtfSubSupervisor ( runNtfSupervisor, @@ -59,7 +60,6 @@ processNtfSub c (connId, cmd) = do ntfServer_ <- getNtfServer c case cmd of NSCCreate -> do - -- TODO merge getNtfSubscription and getRcvQueue into one method to read both in same transaction? (sub_, RcvQueue {server = smpServer, clientNtfCreds}) <- withStore c $ \db -> runExceptT $ do sub_ <- liftIO $ getNtfSubscription db connId q <- ExceptT $ getRcvQueue db connId @@ -69,11 +69,11 @@ processNtfSub c (connId, cmd) = do currentTime <- liftIO getCurrentTime case clientNtfCreds of Just ClientNtfCreds {notifierId} -> do - let newSub = newNtfSubscription connId smpServer (Just notifierId) ntfServer NASKey currentTime - withStore' c $ \db -> createNtfSubscription db newSub (NtfSubAction NSACreate) + let newSub = newNtfSubscription connId smpServer (Just notifierId) ntfServer NASKey + withStore' c $ \db -> createNtfSubscription db NtfSubActionData {action = NtfSubNTFAction NSACreate, actionTs = currentTime, ntfSubscription = newSub} _ -> do - let newSub = newNtfSubscription connId smpServer Nothing ntfServer NASNew currentTime - withStore' c $ \db -> createNtfSubscription db newSub (NtfSubSMPAction NSASmpKey) + let newSub = newNtfSubscription connId smpServer Nothing ntfServer NASNew + withStore' c $ \db -> createNtfSubscription db NtfSubActionData {action = NtfSubSMPAction NSASmpKey, actionTs = currentTime, ntfSubscription = newSub} -- TODO optimize? -- TODO - read action in getNtfSubscription and decide which worker to create -- TODO - SMP worker can create Ntf worker on NKEY completion @@ -90,7 +90,7 @@ processNtfSub c (connId, cmd) = do addNtfWorker ntfServer _ -> pure () -- error - notification server not configured NSCDelete -> do - withStore' c $ \db -> markNtfSubscriptionForDeletion db connId (NtfSubAction NSADelete) + withStore' c $ \db -> markNtfSubscriptionForDeletion db connId (NtfSubNTFAction NSADelete) case ntfServer_ of (Just ntfServer) -> addNtfWorker ntfServer _ -> pure () @@ -128,10 +128,10 @@ runNtfWorker c srv doWork = forever $ do nextSub_ <- withStore' c (`getNextNtfSubAction` srv) case nextSub_ of Nothing -> noWorkToDo - Just ntfSub@(NtfSubscription {connId}, _) -> do + Just a@NtfSubNTFActionData {ntfSubscription = NtfSubscription {connId}} -> do ri <- asks $ reconnectInterval . config withRetryInterval ri $ \loop -> - processAction ntfSub + processAction a `catchError` ( \e -> case e of BROKER NETWORK -> loop @@ -142,11 +142,11 @@ runNtfWorker c srv doWork = forever $ do liftIO $ threadDelay throttle where noWorkToDo = void . atomically $ tryTakeTMVar doWork - processAction :: (NtfSubscription, NtfSubAction) -> m () - processAction (ntfSub@NtfSubscription {connId, smpServer, ntfSubId}, ntfSubAction) = do + processAction :: NtfSubNTFActionData -> m () + processAction NtfSubNTFActionData {ntfAction, actionTs, ntfSubscription = sub@NtfSubscription {connId, smpServer, ntfSubId}} = do ts <- liftIO getCurrentTime - unlessM (rescheduleAction doWork ts ntfSub) $ - case ntfSubAction of + unlessM (rescheduleAction doWork ts actionTs) $ + case ntfAction of NSACreate -> getNtfToken >>= \case Just tkn@NtfToken {ntfTokenId = Just tknId, ntfTknStatus = NTActive, ntfMode = NMInstant} -> do @@ -154,9 +154,10 @@ runNtfWorker c srv doWork = forever $ do case clientNtfCreds of Just ClientNtfCreds {ntfPrivateKey, notifierId} -> do nSubId <- agentNtfCreateSubscription c tknId tkn (SMPQueueNtf smpServer notifierId) ntfPrivateKey - let actionTs = addUTCTime 30 ts + -- TODO smaller retry until Active, less frequently (daily?) once Active + let actionTs' = addUTCTime 30 ts withStore' c $ \db -> - updateNtfSubscription db connId ntfSub {ntfSubId = Just nSubId, ntfSubStatus = NASCreated NSNew, ntfSubActionTs = actionTs} (NtfSubAction NSACheck) + updateNtfSubscription db connId NtfSubActionData {ntfSubscription = sub {ntfSubId = Just nSubId, ntfSubStatus = NASCreated NSNew}, action = NtfSubNTFAction NSACheck, actionTs = actionTs'} _ -> ntfInternalError c connId "NSACreate - no notifier queue credentials" _ -> ntfInternalError c connId "NSACreate - no active token" NSACheck -> @@ -165,7 +166,7 @@ runNtfWorker c srv doWork = forever $ do case ntfSubId of Just nSubId -> agentNtfCheckSubscription c nSubId tkn >>= \case - NSSMPAuth -> updateSub (NASCreated NSSMPAuth) (NtfSubAction NSADelete) ts -- TODO re-create subscription? + NSSMPAuth -> updateSub (NASCreated NSSMPAuth) (NtfSubNTFAction NSADelete) ts -- TODO re-create subscription? status -> updateSubNextCheck ts status Nothing -> ntfInternalError c connId "NSACheck - no subscription ID" _ -> ntfInternalError c connId "NSACheck - no active token" @@ -174,7 +175,7 @@ runNtfWorker c srv doWork = forever $ do (getNtfToken >>= \tkn -> forM_ tkn $ agentNtfDeleteSubscription c nSubId) `E.finally` do withStore' c $ \db -> - updateNtfSubscription db connId ntfSub {ntfSubId = Nothing, ntfSubStatus = NASOff, ntfSubActionTs = ts} (NtfSubSMPAction NSASmpDelete) + updateNtfSubscription db connId NtfSubActionData {ntfSubscription = sub {ntfSubId = Nothing, ntfSubStatus = NASOff}, action = NtfSubSMPAction NSASmpDelete, actionTs = ts} ns <- asks ntfSupervisor atomically $ writeTBQueue (ntfSubQ ns) (connId, NSCNtfSMPWorker smpServer) Nothing -> ntfInternalError c connId "NSADelete - no subscription ID" @@ -182,10 +183,10 @@ runNtfWorker c srv doWork = forever $ do updateSubNextCheck ts toStatus = do checkInterval <- asks $ ntfSubCheckInterval . config let nextCheckTs = addUTCTime checkInterval ts - updateSub (NASCreated toStatus) (NtfSubAction NSACheck) nextCheckTs - updateSub toStatus toAction actionTs = + updateSub (NASCreated toStatus) (NtfSubNTFAction NSACheck) nextCheckTs + updateSub toStatus toAction actionTs' = withStore' c $ \db -> - updateNtfSubscription db connId ntfSub {ntfSubStatus = toStatus, ntfSubActionTs = actionTs} toAction + updateNtfSubscription db connId NtfSubActionData {ntfSubscription = sub {ntfSubStatus = toStatus}, action = toAction, actionTs = actionTs'} runNtfSMPWorker :: forall m. AgentMonad m => AgentClient -> SMPServer -> TMVar () -> m () runNtfSMPWorker c srv doWork = forever $ do @@ -193,10 +194,10 @@ runNtfSMPWorker c srv doWork = forever $ do nextSub_ <- withStore' c (`getNextNtfSubSMPAction` srv) case nextSub_ of Nothing -> noWorkToDo - Just ntfSub@(NtfSubscription {connId}, _) -> do + Just a@NtfSubSMPActionData {ntfSubscription = NtfSubscription {connId}} -> do ri <- asks $ reconnectInterval . config withRetryInterval ri $ \loop -> - processAction ntfSub + processAction a `catchError` ( \e -> case e of BROKER NETWORK -> loop @@ -207,11 +208,11 @@ runNtfSMPWorker c srv doWork = forever $ do liftIO $ threadDelay throttle where noWorkToDo = void . atomically $ tryTakeTMVar doWork - processAction :: (NtfSubscription, NtfSubSMPAction) -> m () - processAction (ntfSub@NtfSubscription {connId, ntfServer}, ntfSubAction) = do + processAction :: NtfSubSMPActionData -> m () + processAction NtfSubSMPActionData {smpAction, actionTs, ntfSubscription = sub@NtfSubscription {connId, ntfServer}} = do ts <- liftIO getCurrentTime - unlessM (rescheduleAction doWork ts ntfSub) $ - case ntfSubAction of + unlessM (rescheduleAction doWork ts actionTs) $ + case smpAction of NSASmpKey -> getNtfToken >>= \case Just NtfToken {ntfTknStatus = NTActive, ntfMode = NMInstant} -> do @@ -223,7 +224,7 @@ runNtfSMPWorker c srv doWork = forever $ do let rcvNtfDhSecret = C.dh' rcvNtfSrvPubDhKey rcvNtfPrivDhKey withStore' c $ \st -> do setRcvQueueNtfCreds st connId ClientNtfCreds {ntfPublicKey, ntfPrivateKey, notifierId, rcvNtfDhSecret} - updateNtfSubscription st connId ntfSub {ntfQueueId = Just notifierId, ntfSubStatus = NASKey, ntfSubActionTs = ts} (NtfSubAction NSACreate) + updateNtfSubscription st connId NtfSubActionData {ntfSubscription = sub {ntfQueueId = Just notifierId, ntfSubStatus = NASKey}, action = NtfSubNTFAction NSACreate, actionTs = ts} ns <- asks ntfSupervisor atomically $ sendNtfSubCommand ns (connId, NSCNtfWorker ntfServer) _ -> ntfInternalError c connId "NSASmpKey - no active token" @@ -232,13 +233,13 @@ runNtfSMPWorker c srv doWork = forever $ do forM_ rq_ $ \rq -> disableQueueNotifications c rq withStore' c $ \db -> deleteNtfSubscription db connId -rescheduleAction :: AgentMonad m => TMVar () -> UTCTime -> NtfSubscription -> m Bool -rescheduleAction doWork ts NtfSubscription {ntfSubActionTs} - | ntfSubActionTs <= ts = pure False +rescheduleAction :: AgentMonad m => TMVar () -> UTCTime -> UTCTime -> m Bool +rescheduleAction doWork ts actionTs + | actionTs <= ts = pure False | otherwise = do void . atomically $ tryTakeTMVar doWork void . forkIO $ do - threadDelay $ diffInMicros ntfSubActionTs ts + threadDelay $ diffInMicros actionTs ts void . atomically $ tryPutTMVar doWork () pure True @@ -280,7 +281,7 @@ closeNtfSupervisor ns = do cancelNtfWorkers_ :: TMap ProtocolServer (TMVar (), Async ()) -> IO () cancelNtfWorkers_ wsVar = do - ws <- atomically $ stateTVar wsVar $ \ws -> (ws, M.empty) + ws <- atomically $ stateTVar wsVar (,M.empty) forM_ ws $ uninterruptibleCancel . snd getNtfServer :: AgentMonad m => AgentClient -> m (Maybe NtfServer) diff --git a/src/Simplex/Messaging/Agent/Store/SQLite.hs b/src/Simplex/Messaging/Agent/Store/SQLite.hs index 06b1f5c0d..3a4d7319e 100644 --- a/src/Simplex/Messaging/Agent/Store/SQLite.hs +++ b/src/Simplex/Messaging/Agent/Store/SQLite.hs @@ -123,7 +123,7 @@ import qualified Simplex.Messaging.Crypto as C import Simplex.Messaging.Crypto.Ratchet (RatchetX448, SkippedMsgDiff (..), SkippedMsgKeys) import Simplex.Messaging.Encoding import Simplex.Messaging.Encoding.String -import Simplex.Messaging.Notifications.Client (NtfAgentSubStatus (..), NtfServer, NtfSubAction (..), NtfSubOrSMPAction (..), NtfSubSMPAction (..), NtfSubscription (..), NtfTknAction (..), NtfToken (..)) +import Simplex.Messaging.Notifications.Client import Simplex.Messaging.Notifications.Protocol (DeviceToken (..), NtfSubscriptionId, NtfTknStatus (..), NtfTokenId, SMPQueueNtf (..)) import Simplex.Messaging.Parsers (blobFieldParser, fromTextField_) import Simplex.Messaging.Protocol (MsgBody, MsgFlags, ProtocolServer (..), RcvNtfDhSecret) @@ -719,7 +719,7 @@ getNtfSubscription db connId = db [sql| SELECT s.host, s.port, s.key_hash, ns.ntf_host, ns.ntf_port, ns.ntf_key_hash, - nsb.smp_ntf_id, nsb.ntf_sub_id, nsb.ntf_sub_status, nsb.ntf_sub_action_ts + nsb.smp_ntf_id, nsb.ntf_sub_id, nsb.ntf_sub_status FROM ntf_subscriptions nsb JOIN servers s ON s.host = nsb.smp_host AND s.port = nsb.smp_port JOIN ntf_servers ns USING (ntf_host, ntf_port) @@ -727,13 +727,13 @@ getNtfSubscription db connId = |] (Only connId) where - ntfSubscription (smpHost, smpPort, smpKeyHash, ntfHost, ntfPort, ntfKeyHash, ntfQueueId, ntfSubId, ntfSubStatus, ntfSubActionTs) = + ntfSubscription (smpHost, smpPort, smpKeyHash, ntfHost, ntfPort, ntfKeyHash, ntfQueueId, ntfSubId, ntfSubStatus) = let smpServer = SMPServer smpHost smpPort smpKeyHash ntfServer = ProtocolServer ntfHost ntfPort ntfKeyHash - in NtfSubscription {connId, smpServer, ntfQueueId, ntfServer, ntfSubId, ntfSubStatus, ntfSubActionTs} + in NtfSubscription {connId, smpServer, ntfQueueId, ntfServer, ntfSubId, ntfSubStatus} -createNtfSubscription :: DB.Connection -> NtfSubscription -> NtfSubOrSMPAction -> IO () -createNtfSubscription db NtfSubscription {connId, smpServer = (SMPServer host port _), ntfQueueId, ntfServer = (SMPServer ntfHost ntfPort _), ntfSubId, ntfSubStatus, ntfSubActionTs} ntfAction = +createNtfSubscription :: DB.Connection -> NtfSubActionData -> IO () +createNtfSubscription db NtfSubActionData {action, actionTs, ntfSubscription = NtfSubscription {connId, smpServer = (SMPServer host port _), ntfQueueId, ntfServer = (SMPServer ntfHost ntfPort _), ntfSubId, ntfSubStatus}} = DB.execute db [sql| @@ -743,12 +743,12 @@ createNtfSubscription db NtfSubscription {connId, smpServer = (SMPServer host po VALUES (?,?,?,?,?,?,?,?,?,?,?) |] ( (connId, host, port, ntfQueueId, ntfHost, ntfPort, ntfSubId) - :. (ntfSubStatus, ntfSubAction, ntfSubSMPAction, ntfSubActionTs) + :. (ntfSubStatus, ntfSubAction, ntfSubSMPAction, actionTs) ) where - (ntfSubAction, ntfSubSMPAction) = ntfSubAndSMPAction ntfAction + (ntfSubAction, ntfSubSMPAction) = ntfSubAndSMPAction action -markNtfSubscriptionForDeletion :: DB.Connection -> ConnId -> NtfSubOrSMPAction -> IO () +markNtfSubscriptionForDeletion :: DB.Connection -> ConnId -> NtfSubAction -> IO () markNtfSubscriptionForDeletion db connId ntfAction = do updatedAt <- getCurrentTime DB.execute @@ -762,8 +762,8 @@ markNtfSubscriptionForDeletion db connId ntfAction = do where (ntfSubAction, ntfSubSMPAction) = ntfSubAndSMPAction ntfAction -updateNtfSubscription :: DB.Connection -> ConnId -> NtfSubscription -> NtfSubOrSMPAction -> IO () -updateNtfSubscription db connId NtfSubscription {ntfQueueId, ntfSubId, ntfSubStatus, ntfSubActionTs} ntfAction = do +updateNtfSubscription :: DB.Connection -> ConnId -> NtfSubActionData -> IO () +updateNtfSubscription db connId NtfSubActionData {action, actionTs, ntfSubscription = NtfSubscription {ntfQueueId, ntfSubId, ntfSubStatus}} = do r <- maybeFirstRow fromOnly $ DB.query db "SELECT updated_by_supervisor FROM ntf_subscriptions WHERE conn_id = ?" (Only connId) forM_ r $ \updatedBySupervisor -> do updatedAt <- getCurrentTime @@ -785,9 +785,9 @@ updateNtfSubscription db connId NtfSubscription {ntfQueueId, ntfSubId, ntfSubSta SET smp_ntf_id = ?, ntf_sub_id = ?, ntf_sub_status = ?, ntf_sub_action = ?, ntf_sub_smp_action = ?, ntf_sub_action_ts = ?, updated_by_supervisor = ?, updated_at = ? WHERE conn_id = ? |] - (ntfQueueId, ntfSubId, ntfSubStatus, ntfSubAction, ntfSubSMPAction, ntfSubActionTs, False, updatedAt, connId) + (ntfQueueId, ntfSubId, ntfSubStatus, ntfSubAction, ntfSubSMPAction, actionTs, False, updatedAt, connId) where - (ntfSubAction, ntfSubSMPAction) = ntfSubAndSMPAction ntfAction + (ntfSubAction, ntfSubSMPAction) = ntfSubAndSMPAction action setNullNtfSubscriptionAction :: DB.Connection -> ConnId -> IO () setNullNtfSubscriptionAction db connId = do @@ -802,7 +802,7 @@ setNullNtfSubscriptionAction db connId = do SET ntf_sub_action = ?, ntf_sub_smp_action = ?, ntf_sub_action_ts = ?, updated_by_supervisor = ?, updated_at = ? WHERE conn_id = ? |] - (Nothing :: Maybe NtfSubAction, Nothing :: Maybe NtfSubSMPAction, Nothing :: Maybe UTCTime, False, updatedAt, connId) + (Nothing :: Maybe NtfSubNTFAction, Nothing :: Maybe NtfSubSMPAction, Nothing :: Maybe UTCTime, False, updatedAt, connId) deleteNtfSubscription :: DB.Connection -> ConnId -> IO () deleteNtfSubscription db connId = do @@ -821,11 +821,11 @@ deleteNtfSubscription db connId = do (Nothing :: Maybe SMP.NotifierId, Nothing :: Maybe NtfSubscriptionId, NASDeleted, False, updatedAt, connId) else DB.execute db "DELETE FROM ntf_subscriptions WHERE conn_id = ?" (Only connId) -getNextNtfSubAction :: DB.Connection -> NtfServer -> IO (Maybe (NtfSubscription, NtfSubAction)) +getNextNtfSubAction :: DB.Connection -> NtfServer -> IO (Maybe NtfSubNTFActionData) getNextNtfSubAction db ntfServer@(ProtocolServer ntfHost ntfPort _) = do - maybeFirstRow ntfSubscription getNtfSubAction_ $>>= \ntfSub@(NtfSubscription {connId}, _) -> do + maybeFirstRow ntfSubscriptionData getNtfSubAction_ $>>= \a@NtfSubNTFActionData {ntfSubscription = NtfSubscription {connId}} -> do DB.execute db "UPDATE ntf_subscriptions SET updated_by_supervisor = ? WHERE conn_id = ?" (False, connId) - pure $ Just ntfSub + pure $ Just a where getNtfSubAction_ = DB.query @@ -840,15 +840,16 @@ getNextNtfSubAction db ntfServer@(ProtocolServer ntfHost ntfPort _) = do LIMIT 1 |] (ntfHost, ntfPort) - ntfSubscription (connId, smpHost, smpPort, smpKeyHash, ntfQueueId, ntfSubId, ntfSubStatus, ntfSubActionTs, ntfSubAction) = + ntfSubscriptionData (connId, smpHost, smpPort, smpKeyHash, ntfQueueId, ntfSubId, ntfSubStatus, actionTs, ntfAction) = let smpServer = SMPServer smpHost smpPort smpKeyHash - in (NtfSubscription {connId, smpServer, ntfQueueId, ntfServer, ntfSubId, ntfSubStatus, ntfSubActionTs}, ntfSubAction) + ntfSubscription = NtfSubscription {connId, smpServer, ntfQueueId, ntfServer, ntfSubId, ntfSubStatus} + in NtfSubNTFActionData {ntfAction, actionTs, ntfSubscription} -getNextNtfSubSMPAction :: DB.Connection -> SMPServer -> IO (Maybe (NtfSubscription, NtfSubSMPAction)) +getNextNtfSubSMPAction :: DB.Connection -> SMPServer -> IO (Maybe NtfSubSMPActionData) getNextNtfSubSMPAction db smpServer@(SMPServer smpHost smpPort _) = do - maybeFirstRow ntfSubscription getNtfSubAction_ $>>= \ntfSub@(NtfSubscription {connId}, _) -> do + maybeFirstRow ntfSubscriptionData getNtfSubAction_ $>>= \a@NtfSubSMPActionData {ntfSubscription = NtfSubscription {connId}} -> do DB.execute db "UPDATE ntf_subscriptions SET updated_by_supervisor = ? WHERE conn_id = ?" (False, connId) - pure $ Just ntfSub + pure $ Just a where getNtfSubAction_ = DB.query @@ -858,14 +859,15 @@ getNextNtfSubSMPAction db smpServer@(SMPServer smpHost smpPort _) = do ns.smp_ntf_id, ns.ntf_sub_id, ns.ntf_sub_status, ns.ntf_sub_action_ts, ns.ntf_sub_smp_action FROM ntf_subscriptions ns JOIN ntf_servers s USING (ntf_host, ntf_port) - WHERE ns.smp_host = ? AND ns.smp_port = ? AND ns.ntf_sub_smp_action IS NOT NULL + WHERE ns.smp_host = ? AND ns.smp_port = ? AND ns.ntf_sub_smp_action IS NOT NULL AND ns.ntf_sub_action_ts IS NOT NULL ORDER BY ns.ntf_sub_action_ts ASC LIMIT 1 |] (smpHost, smpPort) - ntfSubscription (connId, ntfHost, ntfPort, ntfKeyHash, ntfQueueId, ntfSubId, ntfSubStatus, ntfSubActionTs, ntfSubAction) = + ntfSubscriptionData (connId, ntfHost, ntfPort, ntfKeyHash, ntfQueueId, ntfSubId, ntfSubStatus, actionTs, smpAction) = let ntfServer = ProtocolServer ntfHost ntfPort ntfKeyHash - in (NtfSubscription {connId, smpServer, ntfQueueId, ntfServer, ntfSubId, ntfSubStatus, ntfSubActionTs}, ntfSubAction) + ntfSubscription = NtfSubscription {connId, smpServer, ntfQueueId, ntfServer, ntfSubId, ntfSubStatus} + in NtfSubSMPActionData {ntfSubscription, smpAction, actionTs} getActiveNtfToken :: DB.Connection -> IO (Maybe NtfToken) getActiveNtfToken db = @@ -1311,6 +1313,6 @@ createWithRandomId gVar create = tryCreate 3 randomId :: TVar ChaChaDRG -> Int -> IO ByteString randomId gVar n = U.encode <$> (atomically . stateTVar gVar $ randomBytesGenerate n) -ntfSubAndSMPAction :: NtfSubOrSMPAction -> (Maybe NtfSubAction, Maybe NtfSubSMPAction) -ntfSubAndSMPAction (NtfSubAction nsa) = (Just nsa, Nothing) +ntfSubAndSMPAction :: NtfSubAction -> (Maybe NtfSubNTFAction, Maybe NtfSubSMPAction) +ntfSubAndSMPAction (NtfSubNTFAction nsa) = (Just nsa, Nothing) ntfSubAndSMPAction (NtfSubSMPAction nsa) = (Nothing, Just nsa) diff --git a/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/M20220608_v2.hs b/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/M20220608_v2.hs index 544ccf60f..eb769bcea 100644 --- a/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/M20220608_v2.hs +++ b/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/M20220608_v2.hs @@ -35,7 +35,7 @@ CREATE TABLE ntf_subscriptions ( ntf_port TEXT NOT NULL, ntf_sub_id BLOB, ntf_sub_status TEXT NOT NULL, -- see NtfAgentSubStatus - ntf_sub_action TEXT, -- if there is an action required on this subscription: NtfSubAction + ntf_sub_action TEXT, -- if there is an action required on this subscription: NtfSubNTFAction ntf_sub_smp_action TEXT, -- action with SMP server: NtfSubSMPAction; only one of this and ntf_sub_action can (should) be not null in same record ntf_sub_action_ts TEXT, -- the earliest time for the action, e.g. checks can be scheduled every X hours updated_by_supervisor INTEGER NOT NULL DEFAULT 0, -- to be checked on updates by workers to not overwrite supervisor command (state still should be updated) diff --git a/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/agent_schema.sql b/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/agent_schema.sql index bda5853e2..5deac05d5 100644 --- a/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/agent_schema.sql +++ b/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/agent_schema.sql @@ -180,7 +180,7 @@ CREATE TABLE ntf_subscriptions( ntf_port TEXT NOT NULL, ntf_sub_id BLOB, ntf_sub_status TEXT NOT NULL, -- see NtfAgentSubStatus - ntf_sub_action TEXT, -- if there is an action required on this subscription: NtfSubAction + ntf_sub_action TEXT, -- if there is an action required on this subscription: NtfSubNTFAction ntf_sub_smp_action TEXT, -- action with SMP server: NtfSubSMPAction; only one of this and ntf_sub_action can(should) be not null in same record ntf_sub_action_ts TEXT, -- the earliest time for the action, e.g. checks can be scheduled every X hours updated_by_supervisor INTEGER NOT NULL DEFAULT 0, -- to be checked on updates by workers to not overwrite supervisor command(state still should be updated) diff --git a/src/Simplex/Messaging/Notifications/Client.hs b/src/Simplex/Messaging/Notifications/Client.hs index 2432687b2..2e8577cce 100644 --- a/src/Simplex/Messaging/Notifications/Client.hs +++ b/src/Simplex/Messaging/Notifications/Client.hs @@ -136,15 +136,27 @@ newNtfToken deviceToken ntfServer (ntfPubKey, ntfPrivKey) ntfDhKeys ntfMode = ntfMode } -data NtfSubOrSMPAction = NtfSubAction NtfSubAction | NtfSubSMPAction NtfSubSMPAction +data NtfSubAction = NtfSubNTFAction NtfSubNTFAction | NtfSubSMPAction NtfSubSMPAction -data NtfSubAction +data NtfSubActionData = NtfSubActionData + { action :: NtfSubAction, + actionTs :: UTCTime, + ntfSubscription :: NtfSubscription + } + +data NtfSubNTFActionData = NtfSubNTFActionData + { ntfAction :: NtfSubNTFAction, + actionTs :: UTCTime, + ntfSubscription :: NtfSubscription + } + +data NtfSubNTFAction = NSACreate | NSACheck | NSADelete deriving (Show) -instance Encoding NtfSubAction where +instance Encoding NtfSubNTFAction where smpEncode = \case NSACreate -> "N" NSACheck -> "C" @@ -154,11 +166,17 @@ instance Encoding NtfSubAction where 'N' -> pure NSACreate 'C' -> pure NSACheck 'D' -> pure NSADelete - _ -> fail "bad NtfSubAction" + _ -> fail "bad NtfSubNTFAction" -instance FromField NtfSubAction where fromField = blobFieldDecoder smpDecode +instance FromField NtfSubNTFAction where fromField = blobFieldDecoder smpDecode -instance ToField NtfSubAction where toField = toField . smpEncode +instance ToField NtfSubNTFAction where toField = toField . smpEncode + +data NtfSubSMPActionData = NtfSubSMPActionData + { smpAction :: NtfSubSMPAction, + actionTs :: UTCTime, + ntfSubscription :: NtfSubscription + } data NtfSubSMPAction = NSASmpKey @@ -222,19 +240,17 @@ data NtfSubscription = NtfSubscription ntfQueueId :: Maybe NotifierId, ntfServer :: NtfServer, ntfSubId :: Maybe NtfSubscriptionId, - ntfSubStatus :: NtfAgentSubStatus, - ntfSubActionTs :: UTCTime + ntfSubStatus :: NtfAgentSubStatus } deriving (Show) -newNtfSubscription :: ConnId -> SMPServer -> Maybe NotifierId -> NtfServer -> NtfAgentSubStatus -> UTCTime -> NtfSubscription -newNtfSubscription connId smpServer ntfQueueId ntfServer ntfSubStatus ntfSubActionTs = +newNtfSubscription :: ConnId -> SMPServer -> Maybe NotifierId -> NtfServer -> NtfAgentSubStatus -> NtfSubscription +newNtfSubscription connId smpServer ntfQueueId ntfServer ntfSubStatus = NtfSubscription { connId, smpServer, ntfQueueId, ntfServer, ntfSubId = Nothing, - ntfSubStatus, - ntfSubActionTs + ntfSubStatus }