From c82fae72f2e3278cf6dd7c4e95ec4d07404fa75d Mon Sep 17 00:00:00 2001 From: JRoberts <8711996+jr-simplex@users.noreply.github.com> Date: Thu, 30 Jun 2022 15:34:16 +0400 Subject: [PATCH] ntf: refactor NtfSubAction (#445) --- .../Messaging/Agent/NtfSubSupervisor.hs | 28 +++++++------- src/Simplex/Messaging/Agent/Store/SQLite.hs | 35 +++++++++--------- src/Simplex/Messaging/Notifications/Types.hs | 37 ++----------------- 3 files changed, 35 insertions(+), 65 deletions(-) diff --git a/src/Simplex/Messaging/Agent/NtfSubSupervisor.hs b/src/Simplex/Messaging/Agent/NtfSubSupervisor.hs index f760bfcf7..8e863d8fc 100644 --- a/src/Simplex/Messaging/Agent/NtfSubSupervisor.hs +++ b/src/Simplex/Messaging/Agent/NtfSubSupervisor.hs @@ -70,10 +70,10 @@ processNtfSub c (connId, cmd) = do case clientNtfCreds of Just ClientNtfCreds {notifierId} -> do let newSub = newNtfSubscription connId smpServer (Just notifierId) ntfServer NASKey - withStore' c $ \db -> createNtfSubscription db NtfSubActionData {action = NtfSubNTFAction NSACreate, actionTs = currentTime, ntfSubscription = newSub} + withStore' c $ \db -> createNtfSubscription db newSub (NtfSubNTFAction NSACreate) currentTime _ -> do let newSub = newNtfSubscription connId smpServer Nothing ntfServer NASNew - withStore' c $ \db -> createNtfSubscription db NtfSubActionData {action = NtfSubSMPAction NSASmpKey, actionTs = currentTime, ntfSubscription = newSub} + withStore' c $ \db -> createNtfSubscription db newSub (NtfSubSMPAction NSASmpKey) currentTime -- TODO optimize? -- TODO - read action in getNtfSubscription and decide which worker to create -- TODO - SMP worker can create Ntf worker on NKEY completion @@ -125,10 +125,10 @@ processNtfSub c (connId, cmd) = do runNtfWorker :: forall m. AgentMonad m => AgentClient -> NtfServer -> TMVar () -> m () runNtfWorker c srv doWork = forever $ do void . atomically $ readTMVar doWork - nextSub_ <- withStore' c (`getNextNtfSubAction` srv) + nextSub_ <- withStore' c (`getNextNtfSubNTFAction` srv) case nextSub_ of Nothing -> noWorkToDo - Just a@NtfSubNTFActionData {ntfSubscription = NtfSubscription {connId}} -> do + Just a@(NtfSubscription {connId}, _, _) -> do ri <- asks $ reconnectInterval . config withRetryInterval ri $ \loop -> processAction a @@ -142,11 +142,11 @@ runNtfWorker c srv doWork = forever $ do liftIO $ threadDelay throttle where noWorkToDo = void . atomically $ tryTakeTMVar doWork - processAction :: NtfSubNTFActionData -> m () - processAction NtfSubNTFActionData {ntfAction, actionTs, ntfSubscription = sub@NtfSubscription {connId, smpServer, ntfSubId}} = do + processAction :: (NtfSubscription, NtfSubNTFAction, NtfActionTs) -> m () + processAction (sub@NtfSubscription {connId, smpServer, ntfSubId}, action, actionTs) = do ts <- liftIO getCurrentTime unlessM (rescheduleAction doWork ts actionTs) $ - case ntfAction of + case action of NSACreate -> getNtfToken >>= \case Just tkn@NtfToken {ntfTokenId = Just tknId, ntfTknStatus = NTActive, ntfMode = NMInstant} -> do @@ -157,7 +157,7 @@ runNtfWorker c srv doWork = forever $ do -- TODO smaller retry until Active, less frequently (daily?) once Active let actionTs' = addUTCTime 30 ts withStore' c $ \db -> - updateNtfSubscription db connId NtfSubActionData {ntfSubscription = sub {ntfSubId = Just nSubId, ntfSubStatus = NASCreated NSNew}, action = NtfSubNTFAction NSACheck, actionTs = actionTs'} + updateNtfSubscription db connId sub {ntfSubId = Just nSubId, ntfSubStatus = NASCreated NSNew} (NtfSubNTFAction NSACheck) actionTs' _ -> ntfInternalError c connId "NSACreate - no notifier queue credentials" _ -> ntfInternalError c connId "NSACreate - no active token" NSACheck -> @@ -175,7 +175,7 @@ runNtfWorker c srv doWork = forever $ do (getNtfToken >>= \tkn -> forM_ tkn $ agentNtfDeleteSubscription c nSubId) `E.finally` do withStore' c $ \db -> - updateNtfSubscription db connId NtfSubActionData {ntfSubscription = sub {ntfSubId = Nothing, ntfSubStatus = NASOff}, action = NtfSubSMPAction NSASmpDelete, actionTs = ts} + updateNtfSubscription db connId sub {ntfSubId = Nothing, ntfSubStatus = NASOff} (NtfSubSMPAction NSASmpDelete) ts ns <- asks ntfSupervisor atomically $ writeTBQueue (ntfSubQ ns) (connId, NSCNtfSMPWorker smpServer) Nothing -> ntfInternalError c connId "NSADelete - no subscription ID" @@ -186,7 +186,7 @@ runNtfWorker c srv doWork = forever $ do updateSub (NASCreated toStatus) (NtfSubNTFAction NSACheck) nextCheckTs updateSub toStatus toAction actionTs' = withStore' c $ \db -> - updateNtfSubscription db connId NtfSubActionData {ntfSubscription = sub {ntfSubStatus = toStatus}, action = toAction, actionTs = actionTs'} + updateNtfSubscription db connId sub {ntfSubStatus = toStatus} toAction actionTs' runNtfSMPWorker :: forall m. AgentMonad m => AgentClient -> SMPServer -> TMVar () -> m () runNtfSMPWorker c srv doWork = forever $ do @@ -194,7 +194,7 @@ runNtfSMPWorker c srv doWork = forever $ do nextSub_ <- withStore' c (`getNextNtfSubSMPAction` srv) case nextSub_ of Nothing -> noWorkToDo - Just a@NtfSubSMPActionData {ntfSubscription = NtfSubscription {connId}} -> do + Just a@(NtfSubscription {connId}, _, _) -> do ri <- asks $ reconnectInterval . config withRetryInterval ri $ \loop -> processAction a @@ -208,8 +208,8 @@ runNtfSMPWorker c srv doWork = forever $ do liftIO $ threadDelay throttle where noWorkToDo = void . atomically $ tryTakeTMVar doWork - processAction :: NtfSubSMPActionData -> m () - processAction NtfSubSMPActionData {smpAction, actionTs, ntfSubscription = sub@NtfSubscription {connId, ntfServer}} = do + processAction :: (NtfSubscription, NtfSubSMPAction, NtfActionTs) -> m () + processAction (sub@NtfSubscription {connId, ntfServer}, smpAction, actionTs) = do ts <- liftIO getCurrentTime unlessM (rescheduleAction doWork ts actionTs) $ case smpAction of @@ -224,7 +224,7 @@ runNtfSMPWorker c srv doWork = forever $ do let rcvNtfDhSecret = C.dh' rcvNtfSrvPubDhKey rcvNtfPrivDhKey withStore' c $ \st -> do setRcvQueueNtfCreds st connId ClientNtfCreds {ntfPublicKey, ntfPrivateKey, notifierId, rcvNtfDhSecret} - updateNtfSubscription st connId NtfSubActionData {ntfSubscription = sub {ntfQueueId = Just notifierId, ntfSubStatus = NASKey}, action = NtfSubNTFAction NSACreate, actionTs = ts} + updateNtfSubscription st connId sub {ntfQueueId = Just notifierId, ntfSubStatus = NASKey} (NtfSubNTFAction NSACreate) ts ns <- asks ntfSupervisor atomically $ sendNtfSubCommand ns (connId, NSCNtfWorker ntfServer) _ -> ntfInternalError c connId "NSASmpKey - no active token" diff --git a/src/Simplex/Messaging/Agent/Store/SQLite.hs b/src/Simplex/Messaging/Agent/Store/SQLite.hs index b71cfa8eb..0dacb98d8 100644 --- a/src/Simplex/Messaging/Agent/Store/SQLite.hs +++ b/src/Simplex/Messaging/Agent/Store/SQLite.hs @@ -79,7 +79,7 @@ module Simplex.Messaging.Agent.Store.SQLite updateNtfSubscription, setNullNtfSubscriptionAction, deleteNtfSubscription, - getNextNtfSubAction, + getNextNtfSubNTFAction, getNextNtfSubSMPAction, getActiveNtfToken, getNtfRcvQueue, @@ -123,8 +123,8 @@ import qualified Simplex.Messaging.Crypto as C import Simplex.Messaging.Crypto.Ratchet (RatchetX448, SkippedMsgDiff (..), SkippedMsgKeys) import Simplex.Messaging.Encoding import Simplex.Messaging.Encoding.String -import Simplex.Messaging.Notifications.Types import Simplex.Messaging.Notifications.Protocol (DeviceToken (..), NtfSubscriptionId, NtfTknStatus (..), NtfTokenId, SMPQueueNtf (..)) +import Simplex.Messaging.Notifications.Types import Simplex.Messaging.Parsers (blobFieldParser, fromTextField_) import Simplex.Messaging.Protocol (MsgBody, MsgFlags, ProtocolServer (..), RcvNtfDhSecret) import qualified Simplex.Messaging.Protocol as SMP @@ -732,8 +732,9 @@ getNtfSubscription db connId = ntfServer = ProtocolServer ntfHost ntfPort ntfKeyHash in NtfSubscription {connId, smpServer, ntfQueueId, ntfServer, ntfSubId, ntfSubStatus} -createNtfSubscription :: DB.Connection -> NtfSubActionData -> IO () -createNtfSubscription db NtfSubActionData {action, actionTs, ntfSubscription = NtfSubscription {connId, smpServer = (SMPServer host port _), ntfQueueId, ntfServer = (SMPServer ntfHost ntfPort _), ntfSubId, ntfSubStatus}} = +createNtfSubscription :: DB.Connection -> NtfSubscription -> NtfSubAction -> NtfActionTs -> IO () +createNtfSubscription db ntfSubscription action actionTs = do + let NtfSubscription {connId, smpServer = (SMPServer host port _), ntfQueueId, ntfServer = (SMPServer ntfHost ntfPort _), ntfSubId, ntfSubStatus} = ntfSubscription DB.execute db [sql| @@ -762,8 +763,8 @@ markNtfSubscriptionForDeletion db connId ntfAction = do where (ntfSubAction, ntfSubSMPAction) = ntfSubAndSMPAction ntfAction -updateNtfSubscription :: DB.Connection -> ConnId -> NtfSubActionData -> IO () -updateNtfSubscription db connId NtfSubActionData {action, actionTs, ntfSubscription = NtfSubscription {ntfQueueId, ntfSubId, ntfSubStatus}} = do +updateNtfSubscription :: DB.Connection -> ConnId -> NtfSubscription -> NtfSubAction -> NtfActionTs -> IO () +updateNtfSubscription db connId NtfSubscription {ntfQueueId, ntfSubId, ntfSubStatus} action actionTs = do r <- maybeFirstRow fromOnly $ DB.query db "SELECT updated_by_supervisor FROM ntf_subscriptions WHERE conn_id = ?" (Only connId) forM_ r $ \updatedBySupervisor -> do updatedAt <- getCurrentTime @@ -821,9 +822,9 @@ deleteNtfSubscription db connId = do (Nothing :: Maybe SMP.NotifierId, Nothing :: Maybe NtfSubscriptionId, NASDeleted, False, updatedAt, connId) else DB.execute db "DELETE FROM ntf_subscriptions WHERE conn_id = ?" (Only connId) -getNextNtfSubAction :: DB.Connection -> NtfServer -> IO (Maybe NtfSubNTFActionData) -getNextNtfSubAction db ntfServer@(ProtocolServer ntfHost ntfPort _) = do - maybeFirstRow ntfSubscriptionData getNtfSubAction_ $>>= \a@NtfSubNTFActionData {ntfSubscription = NtfSubscription {connId}} -> do +getNextNtfSubNTFAction :: DB.Connection -> NtfServer -> IO (Maybe (NtfSubscription, NtfSubNTFAction, NtfActionTs)) +getNextNtfSubNTFAction db ntfServer@(ProtocolServer ntfHost ntfPort _) = do + maybeFirstRow ntfSubAction getNtfSubAction_ $>>= \a@(NtfSubscription {connId}, _, _) -> do DB.execute db "UPDATE ntf_subscriptions SET updated_by_supervisor = ? WHERE conn_id = ?" (False, connId) pure $ Just a where @@ -840,14 +841,14 @@ getNextNtfSubAction db ntfServer@(ProtocolServer ntfHost ntfPort _) = do LIMIT 1 |] (ntfHost, ntfPort) - ntfSubscriptionData (connId, smpHost, smpPort, smpKeyHash, ntfQueueId, ntfSubId, ntfSubStatus, actionTs, ntfAction) = + ntfSubAction (connId, smpHost, smpPort, smpKeyHash, ntfQueueId, ntfSubId, ntfSubStatus, actionTs, action) = let smpServer = SMPServer smpHost smpPort smpKeyHash ntfSubscription = NtfSubscription {connId, smpServer, ntfQueueId, ntfServer, ntfSubId, ntfSubStatus} - in NtfSubNTFActionData {ntfAction, actionTs, ntfSubscription} + in (ntfSubscription, action, actionTs) -getNextNtfSubSMPAction :: DB.Connection -> SMPServer -> IO (Maybe NtfSubSMPActionData) +getNextNtfSubSMPAction :: DB.Connection -> SMPServer -> IO (Maybe (NtfSubscription, NtfSubSMPAction, NtfActionTs)) getNextNtfSubSMPAction db smpServer@(SMPServer smpHost smpPort _) = do - maybeFirstRow ntfSubscriptionData getNtfSubAction_ $>>= \a@NtfSubSMPActionData {ntfSubscription = NtfSubscription {connId}} -> do + maybeFirstRow ntfSubAction getNtfSubAction_ $>>= \a@(NtfSubscription {connId}, _, _) -> do DB.execute db "UPDATE ntf_subscriptions SET updated_by_supervisor = ? WHERE conn_id = ?" (False, connId) pure $ Just a where @@ -864,10 +865,10 @@ getNextNtfSubSMPAction db smpServer@(SMPServer smpHost smpPort _) = do LIMIT 1 |] (smpHost, smpPort) - ntfSubscriptionData (connId, ntfHost, ntfPort, ntfKeyHash, ntfQueueId, ntfSubId, ntfSubStatus, actionTs, smpAction) = + ntfSubAction (connId, ntfHost, ntfPort, ntfKeyHash, ntfQueueId, ntfSubId, ntfSubStatus, actionTs, action) = let ntfServer = ProtocolServer ntfHost ntfPort ntfKeyHash ntfSubscription = NtfSubscription {connId, smpServer, ntfQueueId, ntfServer, ntfSubId, ntfSubStatus} - in NtfSubSMPActionData {ntfSubscription, smpAction, actionTs} + in (ntfSubscription, action, actionTs) getActiveNtfToken :: DB.Connection -> IO (Maybe NtfToken) getActiveNtfToken db = @@ -1314,5 +1315,5 @@ randomId :: TVar ChaChaDRG -> Int -> IO ByteString randomId gVar n = U.encode <$> (atomically . stateTVar gVar $ randomBytesGenerate n) ntfSubAndSMPAction :: NtfSubAction -> (Maybe NtfSubNTFAction, Maybe NtfSubSMPAction) -ntfSubAndSMPAction (NtfSubNTFAction nsa) = (Just nsa, Nothing) -ntfSubAndSMPAction (NtfSubSMPAction nsa) = (Nothing, Just nsa) +ntfSubAndSMPAction (NtfSubNTFAction action) = (Just action, Nothing) +ntfSubAndSMPAction (NtfSubSMPAction action) = (Nothing, Just action) diff --git a/src/Simplex/Messaging/Notifications/Types.hs b/src/Simplex/Messaging/Notifications/Types.hs index 69d4e30e9..ab6ac13f7 100644 --- a/src/Simplex/Messaging/Notifications/Types.hs +++ b/src/Simplex/Messaging/Notifications/Types.hs @@ -4,22 +4,7 @@ {-# LANGUAGE NamedFieldPuns #-} {-# LANGUAGE OverloadedStrings #-} -module Simplex.Messaging.Notifications.Types - ( NtfServer, - NtfTknAction (..), - NtfToken (..), - newNtfToken, - NtfSubAction (..), - NtfSubActionData (..), - NtfSubNTFActionData (..), - NtfSubNTFAction (..), - NtfSubSMPActionData (..), - NtfSubSMPAction (..), - NtfAgentSubStatus (..), - NtfSubscription (..), - newNtfSubscription, - ) -where +module Simplex.Messaging.Notifications.Types where import qualified Data.Attoparsec.ByteString.Char8 as A import Data.Text.Encoding (decodeLatin1, encodeUtf8) @@ -97,17 +82,7 @@ newNtfToken deviceToken ntfServer (ntfPubKey, ntfPrivKey) ntfDhKeys ntfMode = data NtfSubAction = NtfSubNTFAction NtfSubNTFAction | NtfSubSMPAction NtfSubSMPAction -data NtfSubActionData = NtfSubActionData - { action :: NtfSubAction, - actionTs :: UTCTime, - ntfSubscription :: NtfSubscription - } - -data NtfSubNTFActionData = NtfSubNTFActionData - { ntfAction :: NtfSubNTFAction, - actionTs :: UTCTime, - ntfSubscription :: NtfSubscription - } +type NtfActionTs = UTCTime data NtfSubNTFAction = NSACreate @@ -131,12 +106,6 @@ instance FromField NtfSubNTFAction where fromField = blobFieldDecoder smpDecode instance ToField NtfSubNTFAction where toField = toField . smpEncode -data NtfSubSMPActionData = NtfSubSMPActionData - { smpAction :: NtfSubSMPAction, - actionTs :: UTCTime, - ntfSubscription :: NtfSubscription - } - data NtfSubSMPAction = NSASmpKey | NSASmpDelete @@ -212,4 +181,4 @@ newNtfSubscription connId smpServer ntfQueueId ntfServer ntfSubStatus = ntfServer, ntfSubId = Nothing, ntfSubStatus - } \ No newline at end of file + }