ntf: refactor NtfSubAction (#445)

This commit is contained in:
JRoberts
2022-06-30 15:34:16 +04:00
committed by GitHub
parent 9695786aa5
commit c82fae72f2
3 changed files with 35 additions and 65 deletions

View File

@@ -70,10 +70,10 @@ processNtfSub c (connId, cmd) = do
case clientNtfCreds of
Just ClientNtfCreds {notifierId} -> do
let newSub = newNtfSubscription connId smpServer (Just notifierId) ntfServer NASKey
withStore' c $ \db -> createNtfSubscription db NtfSubActionData {action = NtfSubNTFAction NSACreate, actionTs = currentTime, ntfSubscription = newSub}
withStore' c $ \db -> createNtfSubscription db newSub (NtfSubNTFAction NSACreate) currentTime
_ -> do
let newSub = newNtfSubscription connId smpServer Nothing ntfServer NASNew
withStore' c $ \db -> createNtfSubscription db NtfSubActionData {action = NtfSubSMPAction NSASmpKey, actionTs = currentTime, ntfSubscription = newSub}
withStore' c $ \db -> createNtfSubscription db newSub (NtfSubSMPAction NSASmpKey) currentTime
-- TODO optimize?
-- TODO - read action in getNtfSubscription and decide which worker to create
-- TODO - SMP worker can create Ntf worker on NKEY completion
@@ -125,10 +125,10 @@ processNtfSub c (connId, cmd) = do
runNtfWorker :: forall m. AgentMonad m => AgentClient -> NtfServer -> TMVar () -> m ()
runNtfWorker c srv doWork = forever $ do
void . atomically $ readTMVar doWork
nextSub_ <- withStore' c (`getNextNtfSubAction` srv)
nextSub_ <- withStore' c (`getNextNtfSubNTFAction` srv)
case nextSub_ of
Nothing -> noWorkToDo
Just a@NtfSubNTFActionData {ntfSubscription = NtfSubscription {connId}} -> do
Just a@(NtfSubscription {connId}, _, _) -> do
ri <- asks $ reconnectInterval . config
withRetryInterval ri $ \loop ->
processAction a
@@ -142,11 +142,11 @@ runNtfWorker c srv doWork = forever $ do
liftIO $ threadDelay throttle
where
noWorkToDo = void . atomically $ tryTakeTMVar doWork
processAction :: NtfSubNTFActionData -> m ()
processAction NtfSubNTFActionData {ntfAction, actionTs, ntfSubscription = sub@NtfSubscription {connId, smpServer, ntfSubId}} = do
processAction :: (NtfSubscription, NtfSubNTFAction, NtfActionTs) -> m ()
processAction (sub@NtfSubscription {connId, smpServer, ntfSubId}, action, actionTs) = do
ts <- liftIO getCurrentTime
unlessM (rescheduleAction doWork ts actionTs) $
case ntfAction of
case action of
NSACreate ->
getNtfToken >>= \case
Just tkn@NtfToken {ntfTokenId = Just tknId, ntfTknStatus = NTActive, ntfMode = NMInstant} -> do
@@ -157,7 +157,7 @@ runNtfWorker c srv doWork = forever $ do
-- TODO smaller retry until Active, less frequently (daily?) once Active
let actionTs' = addUTCTime 30 ts
withStore' c $ \db ->
updateNtfSubscription db connId NtfSubActionData {ntfSubscription = sub {ntfSubId = Just nSubId, ntfSubStatus = NASCreated NSNew}, action = NtfSubNTFAction NSACheck, actionTs = actionTs'}
updateNtfSubscription db connId sub {ntfSubId = Just nSubId, ntfSubStatus = NASCreated NSNew} (NtfSubNTFAction NSACheck) actionTs'
_ -> ntfInternalError c connId "NSACreate - no notifier queue credentials"
_ -> ntfInternalError c connId "NSACreate - no active token"
NSACheck ->
@@ -175,7 +175,7 @@ runNtfWorker c srv doWork = forever $ do
(getNtfToken >>= \tkn -> forM_ tkn $ agentNtfDeleteSubscription c nSubId)
`E.finally` do
withStore' c $ \db ->
updateNtfSubscription db connId NtfSubActionData {ntfSubscription = sub {ntfSubId = Nothing, ntfSubStatus = NASOff}, action = NtfSubSMPAction NSASmpDelete, actionTs = ts}
updateNtfSubscription db connId sub {ntfSubId = Nothing, ntfSubStatus = NASOff} (NtfSubSMPAction NSASmpDelete) ts
ns <- asks ntfSupervisor
atomically $ writeTBQueue (ntfSubQ ns) (connId, NSCNtfSMPWorker smpServer)
Nothing -> ntfInternalError c connId "NSADelete - no subscription ID"
@@ -186,7 +186,7 @@ runNtfWorker c srv doWork = forever $ do
updateSub (NASCreated toStatus) (NtfSubNTFAction NSACheck) nextCheckTs
updateSub toStatus toAction actionTs' =
withStore' c $ \db ->
updateNtfSubscription db connId NtfSubActionData {ntfSubscription = sub {ntfSubStatus = toStatus}, action = toAction, actionTs = actionTs'}
updateNtfSubscription db connId sub {ntfSubStatus = toStatus} toAction actionTs'
runNtfSMPWorker :: forall m. AgentMonad m => AgentClient -> SMPServer -> TMVar () -> m ()
runNtfSMPWorker c srv doWork = forever $ do
@@ -194,7 +194,7 @@ runNtfSMPWorker c srv doWork = forever $ do
nextSub_ <- withStore' c (`getNextNtfSubSMPAction` srv)
case nextSub_ of
Nothing -> noWorkToDo
Just a@NtfSubSMPActionData {ntfSubscription = NtfSubscription {connId}} -> do
Just a@(NtfSubscription {connId}, _, _) -> do
ri <- asks $ reconnectInterval . config
withRetryInterval ri $ \loop ->
processAction a
@@ -208,8 +208,8 @@ runNtfSMPWorker c srv doWork = forever $ do
liftIO $ threadDelay throttle
where
noWorkToDo = void . atomically $ tryTakeTMVar doWork
processAction :: NtfSubSMPActionData -> m ()
processAction NtfSubSMPActionData {smpAction, actionTs, ntfSubscription = sub@NtfSubscription {connId, ntfServer}} = do
processAction :: (NtfSubscription, NtfSubSMPAction, NtfActionTs) -> m ()
processAction (sub@NtfSubscription {connId, ntfServer}, smpAction, actionTs) = do
ts <- liftIO getCurrentTime
unlessM (rescheduleAction doWork ts actionTs) $
case smpAction of
@@ -224,7 +224,7 @@ runNtfSMPWorker c srv doWork = forever $ do
let rcvNtfDhSecret = C.dh' rcvNtfSrvPubDhKey rcvNtfPrivDhKey
withStore' c $ \st -> do
setRcvQueueNtfCreds st connId ClientNtfCreds {ntfPublicKey, ntfPrivateKey, notifierId, rcvNtfDhSecret}
updateNtfSubscription st connId NtfSubActionData {ntfSubscription = sub {ntfQueueId = Just notifierId, ntfSubStatus = NASKey}, action = NtfSubNTFAction NSACreate, actionTs = ts}
updateNtfSubscription st connId sub {ntfQueueId = Just notifierId, ntfSubStatus = NASKey} (NtfSubNTFAction NSACreate) ts
ns <- asks ntfSupervisor
atomically $ sendNtfSubCommand ns (connId, NSCNtfWorker ntfServer)
_ -> ntfInternalError c connId "NSASmpKey - no active token"

View File

@@ -79,7 +79,7 @@ module Simplex.Messaging.Agent.Store.SQLite
updateNtfSubscription,
setNullNtfSubscriptionAction,
deleteNtfSubscription,
getNextNtfSubAction,
getNextNtfSubNTFAction,
getNextNtfSubSMPAction,
getActiveNtfToken,
getNtfRcvQueue,
@@ -123,8 +123,8 @@ import qualified Simplex.Messaging.Crypto as C
import Simplex.Messaging.Crypto.Ratchet (RatchetX448, SkippedMsgDiff (..), SkippedMsgKeys)
import Simplex.Messaging.Encoding
import Simplex.Messaging.Encoding.String
import Simplex.Messaging.Notifications.Types
import Simplex.Messaging.Notifications.Protocol (DeviceToken (..), NtfSubscriptionId, NtfTknStatus (..), NtfTokenId, SMPQueueNtf (..))
import Simplex.Messaging.Notifications.Types
import Simplex.Messaging.Parsers (blobFieldParser, fromTextField_)
import Simplex.Messaging.Protocol (MsgBody, MsgFlags, ProtocolServer (..), RcvNtfDhSecret)
import qualified Simplex.Messaging.Protocol as SMP
@@ -732,8 +732,9 @@ getNtfSubscription db connId =
ntfServer = ProtocolServer ntfHost ntfPort ntfKeyHash
in NtfSubscription {connId, smpServer, ntfQueueId, ntfServer, ntfSubId, ntfSubStatus}
createNtfSubscription :: DB.Connection -> NtfSubActionData -> IO ()
createNtfSubscription db NtfSubActionData {action, actionTs, ntfSubscription = NtfSubscription {connId, smpServer = (SMPServer host port _), ntfQueueId, ntfServer = (SMPServer ntfHost ntfPort _), ntfSubId, ntfSubStatus}} =
createNtfSubscription :: DB.Connection -> NtfSubscription -> NtfSubAction -> NtfActionTs -> IO ()
createNtfSubscription db ntfSubscription action actionTs = do
let NtfSubscription {connId, smpServer = (SMPServer host port _), ntfQueueId, ntfServer = (SMPServer ntfHost ntfPort _), ntfSubId, ntfSubStatus} = ntfSubscription
DB.execute
db
[sql|
@@ -762,8 +763,8 @@ markNtfSubscriptionForDeletion db connId ntfAction = do
where
(ntfSubAction, ntfSubSMPAction) = ntfSubAndSMPAction ntfAction
updateNtfSubscription :: DB.Connection -> ConnId -> NtfSubActionData -> IO ()
updateNtfSubscription db connId NtfSubActionData {action, actionTs, ntfSubscription = NtfSubscription {ntfQueueId, ntfSubId, ntfSubStatus}} = do
updateNtfSubscription :: DB.Connection -> ConnId -> NtfSubscription -> NtfSubAction -> NtfActionTs -> IO ()
updateNtfSubscription db connId NtfSubscription {ntfQueueId, ntfSubId, ntfSubStatus} action actionTs = do
r <- maybeFirstRow fromOnly $ DB.query db "SELECT updated_by_supervisor FROM ntf_subscriptions WHERE conn_id = ?" (Only connId)
forM_ r $ \updatedBySupervisor -> do
updatedAt <- getCurrentTime
@@ -821,9 +822,9 @@ deleteNtfSubscription db connId = do
(Nothing :: Maybe SMP.NotifierId, Nothing :: Maybe NtfSubscriptionId, NASDeleted, False, updatedAt, connId)
else DB.execute db "DELETE FROM ntf_subscriptions WHERE conn_id = ?" (Only connId)
getNextNtfSubAction :: DB.Connection -> NtfServer -> IO (Maybe NtfSubNTFActionData)
getNextNtfSubAction db ntfServer@(ProtocolServer ntfHost ntfPort _) = do
maybeFirstRow ntfSubscriptionData getNtfSubAction_ $>>= \a@NtfSubNTFActionData {ntfSubscription = NtfSubscription {connId}} -> do
getNextNtfSubNTFAction :: DB.Connection -> NtfServer -> IO (Maybe (NtfSubscription, NtfSubNTFAction, NtfActionTs))
getNextNtfSubNTFAction db ntfServer@(ProtocolServer ntfHost ntfPort _) = do
maybeFirstRow ntfSubAction getNtfSubAction_ $>>= \a@(NtfSubscription {connId}, _, _) -> do
DB.execute db "UPDATE ntf_subscriptions SET updated_by_supervisor = ? WHERE conn_id = ?" (False, connId)
pure $ Just a
where
@@ -840,14 +841,14 @@ getNextNtfSubAction db ntfServer@(ProtocolServer ntfHost ntfPort _) = do
LIMIT 1
|]
(ntfHost, ntfPort)
ntfSubscriptionData (connId, smpHost, smpPort, smpKeyHash, ntfQueueId, ntfSubId, ntfSubStatus, actionTs, ntfAction) =
ntfSubAction (connId, smpHost, smpPort, smpKeyHash, ntfQueueId, ntfSubId, ntfSubStatus, actionTs, action) =
let smpServer = SMPServer smpHost smpPort smpKeyHash
ntfSubscription = NtfSubscription {connId, smpServer, ntfQueueId, ntfServer, ntfSubId, ntfSubStatus}
in NtfSubNTFActionData {ntfAction, actionTs, ntfSubscription}
in (ntfSubscription, action, actionTs)
getNextNtfSubSMPAction :: DB.Connection -> SMPServer -> IO (Maybe NtfSubSMPActionData)
getNextNtfSubSMPAction :: DB.Connection -> SMPServer -> IO (Maybe (NtfSubscription, NtfSubSMPAction, NtfActionTs))
getNextNtfSubSMPAction db smpServer@(SMPServer smpHost smpPort _) = do
maybeFirstRow ntfSubscriptionData getNtfSubAction_ $>>= \a@NtfSubSMPActionData {ntfSubscription = NtfSubscription {connId}} -> do
maybeFirstRow ntfSubAction getNtfSubAction_ $>>= \a@(NtfSubscription {connId}, _, _) -> do
DB.execute db "UPDATE ntf_subscriptions SET updated_by_supervisor = ? WHERE conn_id = ?" (False, connId)
pure $ Just a
where
@@ -864,10 +865,10 @@ getNextNtfSubSMPAction db smpServer@(SMPServer smpHost smpPort _) = do
LIMIT 1
|]
(smpHost, smpPort)
ntfSubscriptionData (connId, ntfHost, ntfPort, ntfKeyHash, ntfQueueId, ntfSubId, ntfSubStatus, actionTs, smpAction) =
ntfSubAction (connId, ntfHost, ntfPort, ntfKeyHash, ntfQueueId, ntfSubId, ntfSubStatus, actionTs, action) =
let ntfServer = ProtocolServer ntfHost ntfPort ntfKeyHash
ntfSubscription = NtfSubscription {connId, smpServer, ntfQueueId, ntfServer, ntfSubId, ntfSubStatus}
in NtfSubSMPActionData {ntfSubscription, smpAction, actionTs}
in (ntfSubscription, action, actionTs)
getActiveNtfToken :: DB.Connection -> IO (Maybe NtfToken)
getActiveNtfToken db =
@@ -1314,5 +1315,5 @@ randomId :: TVar ChaChaDRG -> Int -> IO ByteString
randomId gVar n = U.encode <$> (atomically . stateTVar gVar $ randomBytesGenerate n)
ntfSubAndSMPAction :: NtfSubAction -> (Maybe NtfSubNTFAction, Maybe NtfSubSMPAction)
ntfSubAndSMPAction (NtfSubNTFAction nsa) = (Just nsa, Nothing)
ntfSubAndSMPAction (NtfSubSMPAction nsa) = (Nothing, Just nsa)
ntfSubAndSMPAction (NtfSubNTFAction action) = (Just action, Nothing)
ntfSubAndSMPAction (NtfSubSMPAction action) = (Nothing, Just action)

View File

@@ -4,22 +4,7 @@
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE OverloadedStrings #-}
module Simplex.Messaging.Notifications.Types
( NtfServer,
NtfTknAction (..),
NtfToken (..),
newNtfToken,
NtfSubAction (..),
NtfSubActionData (..),
NtfSubNTFActionData (..),
NtfSubNTFAction (..),
NtfSubSMPActionData (..),
NtfSubSMPAction (..),
NtfAgentSubStatus (..),
NtfSubscription (..),
newNtfSubscription,
)
where
module Simplex.Messaging.Notifications.Types where
import qualified Data.Attoparsec.ByteString.Char8 as A
import Data.Text.Encoding (decodeLatin1, encodeUtf8)
@@ -97,17 +82,7 @@ newNtfToken deviceToken ntfServer (ntfPubKey, ntfPrivKey) ntfDhKeys ntfMode =
data NtfSubAction = NtfSubNTFAction NtfSubNTFAction | NtfSubSMPAction NtfSubSMPAction
data NtfSubActionData = NtfSubActionData
{ action :: NtfSubAction,
actionTs :: UTCTime,
ntfSubscription :: NtfSubscription
}
data NtfSubNTFActionData = NtfSubNTFActionData
{ ntfAction :: NtfSubNTFAction,
actionTs :: UTCTime,
ntfSubscription :: NtfSubscription
}
type NtfActionTs = UTCTime
data NtfSubNTFAction
= NSACreate
@@ -131,12 +106,6 @@ instance FromField NtfSubNTFAction where fromField = blobFieldDecoder smpDecode
instance ToField NtfSubNTFAction where toField = toField . smpEncode
data NtfSubSMPActionData = NtfSubSMPActionData
{ smpAction :: NtfSubSMPAction,
actionTs :: UTCTime,
ntfSubscription :: NtfSubscription
}
data NtfSubSMPAction
= NSASmpKey
| NSASmpDelete
@@ -212,4 +181,4 @@ newNtfSubscription connId smpServer ntfQueueId ntfServer ntfSubStatus =
ntfServer,
ntfSubId = Nothing,
ntfSubStatus
}
}