mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-05-14 20:35:08 +00:00
ntf: fix null action ts (#443)
Co-authored-by: JRoberts <8711996+jr-simplex@users.noreply.github.com>
This commit is contained in:
committed by
GitHub
parent
79d67694ed
commit
56ef95e8ae
@@ -81,7 +81,6 @@ import Data.Maybe (catMaybes)
|
||||
import Data.Set (Set)
|
||||
import Data.Text.Encoding
|
||||
import Data.Word (Word16)
|
||||
import Database.SQLite.Simple (SQLError)
|
||||
import qualified Database.SQLite.Simple as DB
|
||||
-- import GHC.Conc (unsafeIOToSTM)
|
||||
import Simplex.Messaging.Agent.Env.SQLite
|
||||
@@ -735,7 +734,7 @@ withStore c action = do
|
||||
atomically $ endAgentOperation c AODatabase
|
||||
liftEither $ first storeError r
|
||||
where
|
||||
handleInternal :: SQLError -> IO (Either StoreError a)
|
||||
handleInternal :: E.SomeException -> IO (Either StoreError a)
|
||||
handleInternal = pure . Left . SEInternal . bshow
|
||||
storeError :: StoreError -> AgentErrorType
|
||||
storeError = \case
|
||||
|
||||
@@ -29,6 +29,7 @@ import Control.Monad.Reader
|
||||
import Crypto.Random
|
||||
import Data.List.NonEmpty (NonEmpty)
|
||||
import Data.Time.Clock (NominalDiffTime, nominalDay)
|
||||
import Data.Word (Word16)
|
||||
import Network.Socket
|
||||
import Numeric.Natural
|
||||
import Simplex.Messaging.Agent.Protocol
|
||||
@@ -46,7 +47,6 @@ import Simplex.Messaging.Version
|
||||
import System.Random (StdGen, newStdGen)
|
||||
import UnliftIO (Async)
|
||||
import UnliftIO.STM
|
||||
import Data.Word (Word16)
|
||||
|
||||
-- | Agent monad with MonadReader Env and MonadError AgentErrorType
|
||||
type AgentMonad m = (MonadUnliftIO m, MonadReader Env m, MonadError AgentErrorType m)
|
||||
@@ -142,6 +142,7 @@ data NtfSupervisor = NtfSupervisor
|
||||
}
|
||||
|
||||
data NtfSupervisorCommand = NSCCreate | NSCDelete | NSCSmpDelete | NSCNtfWorker NtfServer | NSCNtfSMPWorker SMPServer
|
||||
deriving (Show)
|
||||
|
||||
newNtfSubSupervisor :: Natural -> STM NtfSupervisor
|
||||
newNtfSubSupervisor qSize = do
|
||||
|
||||
@@ -4,6 +4,7 @@
|
||||
{-# LANGUAGE NamedFieldPuns #-}
|
||||
{-# LANGUAGE OverloadedStrings #-}
|
||||
{-# LANGUAGE ScopedTypeVariables #-}
|
||||
{-# LANGUAGE TupleSections #-}
|
||||
|
||||
module Simplex.Messaging.Agent.NtfSubSupervisor
|
||||
( runNtfSupervisor,
|
||||
@@ -59,7 +60,6 @@ processNtfSub c (connId, cmd) = do
|
||||
ntfServer_ <- getNtfServer c
|
||||
case cmd of
|
||||
NSCCreate -> do
|
||||
-- TODO merge getNtfSubscription and getRcvQueue into one method to read both in same transaction?
|
||||
(sub_, RcvQueue {server = smpServer, clientNtfCreds}) <- withStore c $ \db -> runExceptT $ do
|
||||
sub_ <- liftIO $ getNtfSubscription db connId
|
||||
q <- ExceptT $ getRcvQueue db connId
|
||||
@@ -69,11 +69,11 @@ processNtfSub c (connId, cmd) = do
|
||||
currentTime <- liftIO getCurrentTime
|
||||
case clientNtfCreds of
|
||||
Just ClientNtfCreds {notifierId} -> do
|
||||
let newSub = newNtfSubscription connId smpServer (Just notifierId) ntfServer NASKey currentTime
|
||||
withStore' c $ \db -> createNtfSubscription db newSub (NtfSubAction NSACreate)
|
||||
let newSub = newNtfSubscription connId smpServer (Just notifierId) ntfServer NASKey
|
||||
withStore' c $ \db -> createNtfSubscription db NtfSubActionData {action = NtfSubNTFAction NSACreate, actionTs = currentTime, ntfSubscription = newSub}
|
||||
_ -> do
|
||||
let newSub = newNtfSubscription connId smpServer Nothing ntfServer NASNew currentTime
|
||||
withStore' c $ \db -> createNtfSubscription db newSub (NtfSubSMPAction NSASmpKey)
|
||||
let newSub = newNtfSubscription connId smpServer Nothing ntfServer NASNew
|
||||
withStore' c $ \db -> createNtfSubscription db NtfSubActionData {action = NtfSubSMPAction NSASmpKey, actionTs = currentTime, ntfSubscription = newSub}
|
||||
-- TODO optimize?
|
||||
-- TODO - read action in getNtfSubscription and decide which worker to create
|
||||
-- TODO - SMP worker can create Ntf worker on NKEY completion
|
||||
@@ -90,7 +90,7 @@ processNtfSub c (connId, cmd) = do
|
||||
addNtfWorker ntfServer
|
||||
_ -> pure () -- error - notification server not configured
|
||||
NSCDelete -> do
|
||||
withStore' c $ \db -> markNtfSubscriptionForDeletion db connId (NtfSubAction NSADelete)
|
||||
withStore' c $ \db -> markNtfSubscriptionForDeletion db connId (NtfSubNTFAction NSADelete)
|
||||
case ntfServer_ of
|
||||
(Just ntfServer) -> addNtfWorker ntfServer
|
||||
_ -> pure ()
|
||||
@@ -128,10 +128,10 @@ runNtfWorker c srv doWork = forever $ do
|
||||
nextSub_ <- withStore' c (`getNextNtfSubAction` srv)
|
||||
case nextSub_ of
|
||||
Nothing -> noWorkToDo
|
||||
Just ntfSub@(NtfSubscription {connId}, _) -> do
|
||||
Just a@NtfSubNTFActionData {ntfSubscription = NtfSubscription {connId}} -> do
|
||||
ri <- asks $ reconnectInterval . config
|
||||
withRetryInterval ri $ \loop ->
|
||||
processAction ntfSub
|
||||
processAction a
|
||||
`catchError` ( \e ->
|
||||
case e of
|
||||
BROKER NETWORK -> loop
|
||||
@@ -142,11 +142,11 @@ runNtfWorker c srv doWork = forever $ do
|
||||
liftIO $ threadDelay throttle
|
||||
where
|
||||
noWorkToDo = void . atomically $ tryTakeTMVar doWork
|
||||
processAction :: (NtfSubscription, NtfSubAction) -> m ()
|
||||
processAction (ntfSub@NtfSubscription {connId, smpServer, ntfSubId}, ntfSubAction) = do
|
||||
processAction :: NtfSubNTFActionData -> m ()
|
||||
processAction NtfSubNTFActionData {ntfAction, actionTs, ntfSubscription = sub@NtfSubscription {connId, smpServer, ntfSubId}} = do
|
||||
ts <- liftIO getCurrentTime
|
||||
unlessM (rescheduleAction doWork ts ntfSub) $
|
||||
case ntfSubAction of
|
||||
unlessM (rescheduleAction doWork ts actionTs) $
|
||||
case ntfAction of
|
||||
NSACreate ->
|
||||
getNtfToken >>= \case
|
||||
Just tkn@NtfToken {ntfTokenId = Just tknId, ntfTknStatus = NTActive, ntfMode = NMInstant} -> do
|
||||
@@ -154,9 +154,10 @@ runNtfWorker c srv doWork = forever $ do
|
||||
case clientNtfCreds of
|
||||
Just ClientNtfCreds {ntfPrivateKey, notifierId} -> do
|
||||
nSubId <- agentNtfCreateSubscription c tknId tkn (SMPQueueNtf smpServer notifierId) ntfPrivateKey
|
||||
let actionTs = addUTCTime 30 ts
|
||||
-- TODO smaller retry until Active, less frequently (daily?) once Active
|
||||
let actionTs' = addUTCTime 30 ts
|
||||
withStore' c $ \db ->
|
||||
updateNtfSubscription db connId ntfSub {ntfSubId = Just nSubId, ntfSubStatus = NASCreated NSNew, ntfSubActionTs = actionTs} (NtfSubAction NSACheck)
|
||||
updateNtfSubscription db connId NtfSubActionData {ntfSubscription = sub {ntfSubId = Just nSubId, ntfSubStatus = NASCreated NSNew}, action = NtfSubNTFAction NSACheck, actionTs = actionTs'}
|
||||
_ -> ntfInternalError c connId "NSACreate - no notifier queue credentials"
|
||||
_ -> ntfInternalError c connId "NSACreate - no active token"
|
||||
NSACheck ->
|
||||
@@ -165,7 +166,7 @@ runNtfWorker c srv doWork = forever $ do
|
||||
case ntfSubId of
|
||||
Just nSubId ->
|
||||
agentNtfCheckSubscription c nSubId tkn >>= \case
|
||||
NSSMPAuth -> updateSub (NASCreated NSSMPAuth) (NtfSubAction NSADelete) ts -- TODO re-create subscription?
|
||||
NSSMPAuth -> updateSub (NASCreated NSSMPAuth) (NtfSubNTFAction NSADelete) ts -- TODO re-create subscription?
|
||||
status -> updateSubNextCheck ts status
|
||||
Nothing -> ntfInternalError c connId "NSACheck - no subscription ID"
|
||||
_ -> ntfInternalError c connId "NSACheck - no active token"
|
||||
@@ -174,7 +175,7 @@ runNtfWorker c srv doWork = forever $ do
|
||||
(getNtfToken >>= \tkn -> forM_ tkn $ agentNtfDeleteSubscription c nSubId)
|
||||
`E.finally` do
|
||||
withStore' c $ \db ->
|
||||
updateNtfSubscription db connId ntfSub {ntfSubId = Nothing, ntfSubStatus = NASOff, ntfSubActionTs = ts} (NtfSubSMPAction NSASmpDelete)
|
||||
updateNtfSubscription db connId NtfSubActionData {ntfSubscription = sub {ntfSubId = Nothing, ntfSubStatus = NASOff}, action = NtfSubSMPAction NSASmpDelete, actionTs = ts}
|
||||
ns <- asks ntfSupervisor
|
||||
atomically $ writeTBQueue (ntfSubQ ns) (connId, NSCNtfSMPWorker smpServer)
|
||||
Nothing -> ntfInternalError c connId "NSADelete - no subscription ID"
|
||||
@@ -182,10 +183,10 @@ runNtfWorker c srv doWork = forever $ do
|
||||
updateSubNextCheck ts toStatus = do
|
||||
checkInterval <- asks $ ntfSubCheckInterval . config
|
||||
let nextCheckTs = addUTCTime checkInterval ts
|
||||
updateSub (NASCreated toStatus) (NtfSubAction NSACheck) nextCheckTs
|
||||
updateSub toStatus toAction actionTs =
|
||||
updateSub (NASCreated toStatus) (NtfSubNTFAction NSACheck) nextCheckTs
|
||||
updateSub toStatus toAction actionTs' =
|
||||
withStore' c $ \db ->
|
||||
updateNtfSubscription db connId ntfSub {ntfSubStatus = toStatus, ntfSubActionTs = actionTs} toAction
|
||||
updateNtfSubscription db connId NtfSubActionData {ntfSubscription = sub {ntfSubStatus = toStatus}, action = toAction, actionTs = actionTs'}
|
||||
|
||||
runNtfSMPWorker :: forall m. AgentMonad m => AgentClient -> SMPServer -> TMVar () -> m ()
|
||||
runNtfSMPWorker c srv doWork = forever $ do
|
||||
@@ -193,10 +194,10 @@ runNtfSMPWorker c srv doWork = forever $ do
|
||||
nextSub_ <- withStore' c (`getNextNtfSubSMPAction` srv)
|
||||
case nextSub_ of
|
||||
Nothing -> noWorkToDo
|
||||
Just ntfSub@(NtfSubscription {connId}, _) -> do
|
||||
Just a@NtfSubSMPActionData {ntfSubscription = NtfSubscription {connId}} -> do
|
||||
ri <- asks $ reconnectInterval . config
|
||||
withRetryInterval ri $ \loop ->
|
||||
processAction ntfSub
|
||||
processAction a
|
||||
`catchError` ( \e ->
|
||||
case e of
|
||||
BROKER NETWORK -> loop
|
||||
@@ -207,11 +208,11 @@ runNtfSMPWorker c srv doWork = forever $ do
|
||||
liftIO $ threadDelay throttle
|
||||
where
|
||||
noWorkToDo = void . atomically $ tryTakeTMVar doWork
|
||||
processAction :: (NtfSubscription, NtfSubSMPAction) -> m ()
|
||||
processAction (ntfSub@NtfSubscription {connId, ntfServer}, ntfSubAction) = do
|
||||
processAction :: NtfSubSMPActionData -> m ()
|
||||
processAction NtfSubSMPActionData {smpAction, actionTs, ntfSubscription = sub@NtfSubscription {connId, ntfServer}} = do
|
||||
ts <- liftIO getCurrentTime
|
||||
unlessM (rescheduleAction doWork ts ntfSub) $
|
||||
case ntfSubAction of
|
||||
unlessM (rescheduleAction doWork ts actionTs) $
|
||||
case smpAction of
|
||||
NSASmpKey ->
|
||||
getNtfToken >>= \case
|
||||
Just NtfToken {ntfTknStatus = NTActive, ntfMode = NMInstant} -> do
|
||||
@@ -223,7 +224,7 @@ runNtfSMPWorker c srv doWork = forever $ do
|
||||
let rcvNtfDhSecret = C.dh' rcvNtfSrvPubDhKey rcvNtfPrivDhKey
|
||||
withStore' c $ \st -> do
|
||||
setRcvQueueNtfCreds st connId ClientNtfCreds {ntfPublicKey, ntfPrivateKey, notifierId, rcvNtfDhSecret}
|
||||
updateNtfSubscription st connId ntfSub {ntfQueueId = Just notifierId, ntfSubStatus = NASKey, ntfSubActionTs = ts} (NtfSubAction NSACreate)
|
||||
updateNtfSubscription st connId NtfSubActionData {ntfSubscription = sub {ntfQueueId = Just notifierId, ntfSubStatus = NASKey}, action = NtfSubNTFAction NSACreate, actionTs = ts}
|
||||
ns <- asks ntfSupervisor
|
||||
atomically $ sendNtfSubCommand ns (connId, NSCNtfWorker ntfServer)
|
||||
_ -> ntfInternalError c connId "NSASmpKey - no active token"
|
||||
@@ -232,13 +233,13 @@ runNtfSMPWorker c srv doWork = forever $ do
|
||||
forM_ rq_ $ \rq -> disableQueueNotifications c rq
|
||||
withStore' c $ \db -> deleteNtfSubscription db connId
|
||||
|
||||
rescheduleAction :: AgentMonad m => TMVar () -> UTCTime -> NtfSubscription -> m Bool
|
||||
rescheduleAction doWork ts NtfSubscription {ntfSubActionTs}
|
||||
| ntfSubActionTs <= ts = pure False
|
||||
rescheduleAction :: AgentMonad m => TMVar () -> UTCTime -> UTCTime -> m Bool
|
||||
rescheduleAction doWork ts actionTs
|
||||
| actionTs <= ts = pure False
|
||||
| otherwise = do
|
||||
void . atomically $ tryTakeTMVar doWork
|
||||
void . forkIO $ do
|
||||
threadDelay $ diffInMicros ntfSubActionTs ts
|
||||
threadDelay $ diffInMicros actionTs ts
|
||||
void . atomically $ tryPutTMVar doWork ()
|
||||
pure True
|
||||
|
||||
@@ -280,7 +281,7 @@ closeNtfSupervisor ns = do
|
||||
|
||||
cancelNtfWorkers_ :: TMap ProtocolServer (TMVar (), Async ()) -> IO ()
|
||||
cancelNtfWorkers_ wsVar = do
|
||||
ws <- atomically $ stateTVar wsVar $ \ws -> (ws, M.empty)
|
||||
ws <- atomically $ stateTVar wsVar (,M.empty)
|
||||
forM_ ws $ uninterruptibleCancel . snd
|
||||
|
||||
getNtfServer :: AgentMonad m => AgentClient -> m (Maybe NtfServer)
|
||||
|
||||
@@ -123,7 +123,7 @@ import qualified Simplex.Messaging.Crypto as C
|
||||
import Simplex.Messaging.Crypto.Ratchet (RatchetX448, SkippedMsgDiff (..), SkippedMsgKeys)
|
||||
import Simplex.Messaging.Encoding
|
||||
import Simplex.Messaging.Encoding.String
|
||||
import Simplex.Messaging.Notifications.Client (NtfAgentSubStatus (..), NtfServer, NtfSubAction (..), NtfSubOrSMPAction (..), NtfSubSMPAction (..), NtfSubscription (..), NtfTknAction (..), NtfToken (..))
|
||||
import Simplex.Messaging.Notifications.Client
|
||||
import Simplex.Messaging.Notifications.Protocol (DeviceToken (..), NtfSubscriptionId, NtfTknStatus (..), NtfTokenId, SMPQueueNtf (..))
|
||||
import Simplex.Messaging.Parsers (blobFieldParser, fromTextField_)
|
||||
import Simplex.Messaging.Protocol (MsgBody, MsgFlags, ProtocolServer (..), RcvNtfDhSecret)
|
||||
@@ -719,7 +719,7 @@ getNtfSubscription db connId =
|
||||
db
|
||||
[sql|
|
||||
SELECT s.host, s.port, s.key_hash, ns.ntf_host, ns.ntf_port, ns.ntf_key_hash,
|
||||
nsb.smp_ntf_id, nsb.ntf_sub_id, nsb.ntf_sub_status, nsb.ntf_sub_action_ts
|
||||
nsb.smp_ntf_id, nsb.ntf_sub_id, nsb.ntf_sub_status
|
||||
FROM ntf_subscriptions nsb
|
||||
JOIN servers s ON s.host = nsb.smp_host AND s.port = nsb.smp_port
|
||||
JOIN ntf_servers ns USING (ntf_host, ntf_port)
|
||||
@@ -727,13 +727,13 @@ getNtfSubscription db connId =
|
||||
|]
|
||||
(Only connId)
|
||||
where
|
||||
ntfSubscription (smpHost, smpPort, smpKeyHash, ntfHost, ntfPort, ntfKeyHash, ntfQueueId, ntfSubId, ntfSubStatus, ntfSubActionTs) =
|
||||
ntfSubscription (smpHost, smpPort, smpKeyHash, ntfHost, ntfPort, ntfKeyHash, ntfQueueId, ntfSubId, ntfSubStatus) =
|
||||
let smpServer = SMPServer smpHost smpPort smpKeyHash
|
||||
ntfServer = ProtocolServer ntfHost ntfPort ntfKeyHash
|
||||
in NtfSubscription {connId, smpServer, ntfQueueId, ntfServer, ntfSubId, ntfSubStatus, ntfSubActionTs}
|
||||
in NtfSubscription {connId, smpServer, ntfQueueId, ntfServer, ntfSubId, ntfSubStatus}
|
||||
|
||||
createNtfSubscription :: DB.Connection -> NtfSubscription -> NtfSubOrSMPAction -> IO ()
|
||||
createNtfSubscription db NtfSubscription {connId, smpServer = (SMPServer host port _), ntfQueueId, ntfServer = (SMPServer ntfHost ntfPort _), ntfSubId, ntfSubStatus, ntfSubActionTs} ntfAction =
|
||||
createNtfSubscription :: DB.Connection -> NtfSubActionData -> IO ()
|
||||
createNtfSubscription db NtfSubActionData {action, actionTs, ntfSubscription = NtfSubscription {connId, smpServer = (SMPServer host port _), ntfQueueId, ntfServer = (SMPServer ntfHost ntfPort _), ntfSubId, ntfSubStatus}} =
|
||||
DB.execute
|
||||
db
|
||||
[sql|
|
||||
@@ -743,12 +743,12 @@ createNtfSubscription db NtfSubscription {connId, smpServer = (SMPServer host po
|
||||
VALUES (?,?,?,?,?,?,?,?,?,?,?)
|
||||
|]
|
||||
( (connId, host, port, ntfQueueId, ntfHost, ntfPort, ntfSubId)
|
||||
:. (ntfSubStatus, ntfSubAction, ntfSubSMPAction, ntfSubActionTs)
|
||||
:. (ntfSubStatus, ntfSubAction, ntfSubSMPAction, actionTs)
|
||||
)
|
||||
where
|
||||
(ntfSubAction, ntfSubSMPAction) = ntfSubAndSMPAction ntfAction
|
||||
(ntfSubAction, ntfSubSMPAction) = ntfSubAndSMPAction action
|
||||
|
||||
markNtfSubscriptionForDeletion :: DB.Connection -> ConnId -> NtfSubOrSMPAction -> IO ()
|
||||
markNtfSubscriptionForDeletion :: DB.Connection -> ConnId -> NtfSubAction -> IO ()
|
||||
markNtfSubscriptionForDeletion db connId ntfAction = do
|
||||
updatedAt <- getCurrentTime
|
||||
DB.execute
|
||||
@@ -762,8 +762,8 @@ markNtfSubscriptionForDeletion db connId ntfAction = do
|
||||
where
|
||||
(ntfSubAction, ntfSubSMPAction) = ntfSubAndSMPAction ntfAction
|
||||
|
||||
updateNtfSubscription :: DB.Connection -> ConnId -> NtfSubscription -> NtfSubOrSMPAction -> IO ()
|
||||
updateNtfSubscription db connId NtfSubscription {ntfQueueId, ntfSubId, ntfSubStatus, ntfSubActionTs} ntfAction = do
|
||||
updateNtfSubscription :: DB.Connection -> ConnId -> NtfSubActionData -> IO ()
|
||||
updateNtfSubscription db connId NtfSubActionData {action, actionTs, ntfSubscription = NtfSubscription {ntfQueueId, ntfSubId, ntfSubStatus}} = do
|
||||
r <- maybeFirstRow fromOnly $ DB.query db "SELECT updated_by_supervisor FROM ntf_subscriptions WHERE conn_id = ?" (Only connId)
|
||||
forM_ r $ \updatedBySupervisor -> do
|
||||
updatedAt <- getCurrentTime
|
||||
@@ -785,9 +785,9 @@ updateNtfSubscription db connId NtfSubscription {ntfQueueId, ntfSubId, ntfSubSta
|
||||
SET smp_ntf_id = ?, ntf_sub_id = ?, ntf_sub_status = ?, ntf_sub_action = ?, ntf_sub_smp_action = ?, ntf_sub_action_ts = ?, updated_by_supervisor = ?, updated_at = ?
|
||||
WHERE conn_id = ?
|
||||
|]
|
||||
(ntfQueueId, ntfSubId, ntfSubStatus, ntfSubAction, ntfSubSMPAction, ntfSubActionTs, False, updatedAt, connId)
|
||||
(ntfQueueId, ntfSubId, ntfSubStatus, ntfSubAction, ntfSubSMPAction, actionTs, False, updatedAt, connId)
|
||||
where
|
||||
(ntfSubAction, ntfSubSMPAction) = ntfSubAndSMPAction ntfAction
|
||||
(ntfSubAction, ntfSubSMPAction) = ntfSubAndSMPAction action
|
||||
|
||||
setNullNtfSubscriptionAction :: DB.Connection -> ConnId -> IO ()
|
||||
setNullNtfSubscriptionAction db connId = do
|
||||
@@ -802,7 +802,7 @@ setNullNtfSubscriptionAction db connId = do
|
||||
SET ntf_sub_action = ?, ntf_sub_smp_action = ?, ntf_sub_action_ts = ?, updated_by_supervisor = ?, updated_at = ?
|
||||
WHERE conn_id = ?
|
||||
|]
|
||||
(Nothing :: Maybe NtfSubAction, Nothing :: Maybe NtfSubSMPAction, Nothing :: Maybe UTCTime, False, updatedAt, connId)
|
||||
(Nothing :: Maybe NtfSubNTFAction, Nothing :: Maybe NtfSubSMPAction, Nothing :: Maybe UTCTime, False, updatedAt, connId)
|
||||
|
||||
deleteNtfSubscription :: DB.Connection -> ConnId -> IO ()
|
||||
deleteNtfSubscription db connId = do
|
||||
@@ -821,11 +821,11 @@ deleteNtfSubscription db connId = do
|
||||
(Nothing :: Maybe SMP.NotifierId, Nothing :: Maybe NtfSubscriptionId, NASDeleted, False, updatedAt, connId)
|
||||
else DB.execute db "DELETE FROM ntf_subscriptions WHERE conn_id = ?" (Only connId)
|
||||
|
||||
getNextNtfSubAction :: DB.Connection -> NtfServer -> IO (Maybe (NtfSubscription, NtfSubAction))
|
||||
getNextNtfSubAction :: DB.Connection -> NtfServer -> IO (Maybe NtfSubNTFActionData)
|
||||
getNextNtfSubAction db ntfServer@(ProtocolServer ntfHost ntfPort _) = do
|
||||
maybeFirstRow ntfSubscription getNtfSubAction_ $>>= \ntfSub@(NtfSubscription {connId}, _) -> do
|
||||
maybeFirstRow ntfSubscriptionData getNtfSubAction_ $>>= \a@NtfSubNTFActionData {ntfSubscription = NtfSubscription {connId}} -> do
|
||||
DB.execute db "UPDATE ntf_subscriptions SET updated_by_supervisor = ? WHERE conn_id = ?" (False, connId)
|
||||
pure $ Just ntfSub
|
||||
pure $ Just a
|
||||
where
|
||||
getNtfSubAction_ =
|
||||
DB.query
|
||||
@@ -840,15 +840,16 @@ getNextNtfSubAction db ntfServer@(ProtocolServer ntfHost ntfPort _) = do
|
||||
LIMIT 1
|
||||
|]
|
||||
(ntfHost, ntfPort)
|
||||
ntfSubscription (connId, smpHost, smpPort, smpKeyHash, ntfQueueId, ntfSubId, ntfSubStatus, ntfSubActionTs, ntfSubAction) =
|
||||
ntfSubscriptionData (connId, smpHost, smpPort, smpKeyHash, ntfQueueId, ntfSubId, ntfSubStatus, actionTs, ntfAction) =
|
||||
let smpServer = SMPServer smpHost smpPort smpKeyHash
|
||||
in (NtfSubscription {connId, smpServer, ntfQueueId, ntfServer, ntfSubId, ntfSubStatus, ntfSubActionTs}, ntfSubAction)
|
||||
ntfSubscription = NtfSubscription {connId, smpServer, ntfQueueId, ntfServer, ntfSubId, ntfSubStatus}
|
||||
in NtfSubNTFActionData {ntfAction, actionTs, ntfSubscription}
|
||||
|
||||
getNextNtfSubSMPAction :: DB.Connection -> SMPServer -> IO (Maybe (NtfSubscription, NtfSubSMPAction))
|
||||
getNextNtfSubSMPAction :: DB.Connection -> SMPServer -> IO (Maybe NtfSubSMPActionData)
|
||||
getNextNtfSubSMPAction db smpServer@(SMPServer smpHost smpPort _) = do
|
||||
maybeFirstRow ntfSubscription getNtfSubAction_ $>>= \ntfSub@(NtfSubscription {connId}, _) -> do
|
||||
maybeFirstRow ntfSubscriptionData getNtfSubAction_ $>>= \a@NtfSubSMPActionData {ntfSubscription = NtfSubscription {connId}} -> do
|
||||
DB.execute db "UPDATE ntf_subscriptions SET updated_by_supervisor = ? WHERE conn_id = ?" (False, connId)
|
||||
pure $ Just ntfSub
|
||||
pure $ Just a
|
||||
where
|
||||
getNtfSubAction_ =
|
||||
DB.query
|
||||
@@ -858,14 +859,15 @@ getNextNtfSubSMPAction db smpServer@(SMPServer smpHost smpPort _) = do
|
||||
ns.smp_ntf_id, ns.ntf_sub_id, ns.ntf_sub_status, ns.ntf_sub_action_ts, ns.ntf_sub_smp_action
|
||||
FROM ntf_subscriptions ns
|
||||
JOIN ntf_servers s USING (ntf_host, ntf_port)
|
||||
WHERE ns.smp_host = ? AND ns.smp_port = ? AND ns.ntf_sub_smp_action IS NOT NULL
|
||||
WHERE ns.smp_host = ? AND ns.smp_port = ? AND ns.ntf_sub_smp_action IS NOT NULL AND ns.ntf_sub_action_ts IS NOT NULL
|
||||
ORDER BY ns.ntf_sub_action_ts ASC
|
||||
LIMIT 1
|
||||
|]
|
||||
(smpHost, smpPort)
|
||||
ntfSubscription (connId, ntfHost, ntfPort, ntfKeyHash, ntfQueueId, ntfSubId, ntfSubStatus, ntfSubActionTs, ntfSubAction) =
|
||||
ntfSubscriptionData (connId, ntfHost, ntfPort, ntfKeyHash, ntfQueueId, ntfSubId, ntfSubStatus, actionTs, smpAction) =
|
||||
let ntfServer = ProtocolServer ntfHost ntfPort ntfKeyHash
|
||||
in (NtfSubscription {connId, smpServer, ntfQueueId, ntfServer, ntfSubId, ntfSubStatus, ntfSubActionTs}, ntfSubAction)
|
||||
ntfSubscription = NtfSubscription {connId, smpServer, ntfQueueId, ntfServer, ntfSubId, ntfSubStatus}
|
||||
in NtfSubSMPActionData {ntfSubscription, smpAction, actionTs}
|
||||
|
||||
getActiveNtfToken :: DB.Connection -> IO (Maybe NtfToken)
|
||||
getActiveNtfToken db =
|
||||
@@ -1311,6 +1313,6 @@ createWithRandomId gVar create = tryCreate 3
|
||||
randomId :: TVar ChaChaDRG -> Int -> IO ByteString
|
||||
randomId gVar n = U.encode <$> (atomically . stateTVar gVar $ randomBytesGenerate n)
|
||||
|
||||
ntfSubAndSMPAction :: NtfSubOrSMPAction -> (Maybe NtfSubAction, Maybe NtfSubSMPAction)
|
||||
ntfSubAndSMPAction (NtfSubAction nsa) = (Just nsa, Nothing)
|
||||
ntfSubAndSMPAction :: NtfSubAction -> (Maybe NtfSubNTFAction, Maybe NtfSubSMPAction)
|
||||
ntfSubAndSMPAction (NtfSubNTFAction nsa) = (Just nsa, Nothing)
|
||||
ntfSubAndSMPAction (NtfSubSMPAction nsa) = (Nothing, Just nsa)
|
||||
|
||||
@@ -35,7 +35,7 @@ CREATE TABLE ntf_subscriptions (
|
||||
ntf_port TEXT NOT NULL,
|
||||
ntf_sub_id BLOB,
|
||||
ntf_sub_status TEXT NOT NULL, -- see NtfAgentSubStatus
|
||||
ntf_sub_action TEXT, -- if there is an action required on this subscription: NtfSubAction
|
||||
ntf_sub_action TEXT, -- if there is an action required on this subscription: NtfSubNTFAction
|
||||
ntf_sub_smp_action TEXT, -- action with SMP server: NtfSubSMPAction; only one of this and ntf_sub_action can (should) be not null in same record
|
||||
ntf_sub_action_ts TEXT, -- the earliest time for the action, e.g. checks can be scheduled every X hours
|
||||
updated_by_supervisor INTEGER NOT NULL DEFAULT 0, -- to be checked on updates by workers to not overwrite supervisor command (state still should be updated)
|
||||
|
||||
@@ -180,7 +180,7 @@ CREATE TABLE ntf_subscriptions(
|
||||
ntf_port TEXT NOT NULL,
|
||||
ntf_sub_id BLOB,
|
||||
ntf_sub_status TEXT NOT NULL, -- see NtfAgentSubStatus
|
||||
ntf_sub_action TEXT, -- if there is an action required on this subscription: NtfSubAction
|
||||
ntf_sub_action TEXT, -- if there is an action required on this subscription: NtfSubNTFAction
|
||||
ntf_sub_smp_action TEXT, -- action with SMP server: NtfSubSMPAction; only one of this and ntf_sub_action can(should) be not null in same record
|
||||
ntf_sub_action_ts TEXT, -- the earliest time for the action, e.g. checks can be scheduled every X hours
|
||||
updated_by_supervisor INTEGER NOT NULL DEFAULT 0, -- to be checked on updates by workers to not overwrite supervisor command(state still should be updated)
|
||||
|
||||
@@ -136,15 +136,27 @@ newNtfToken deviceToken ntfServer (ntfPubKey, ntfPrivKey) ntfDhKeys ntfMode =
|
||||
ntfMode
|
||||
}
|
||||
|
||||
data NtfSubOrSMPAction = NtfSubAction NtfSubAction | NtfSubSMPAction NtfSubSMPAction
|
||||
data NtfSubAction = NtfSubNTFAction NtfSubNTFAction | NtfSubSMPAction NtfSubSMPAction
|
||||
|
||||
data NtfSubAction
|
||||
data NtfSubActionData = NtfSubActionData
|
||||
{ action :: NtfSubAction,
|
||||
actionTs :: UTCTime,
|
||||
ntfSubscription :: NtfSubscription
|
||||
}
|
||||
|
||||
data NtfSubNTFActionData = NtfSubNTFActionData
|
||||
{ ntfAction :: NtfSubNTFAction,
|
||||
actionTs :: UTCTime,
|
||||
ntfSubscription :: NtfSubscription
|
||||
}
|
||||
|
||||
data NtfSubNTFAction
|
||||
= NSACreate
|
||||
| NSACheck
|
||||
| NSADelete
|
||||
deriving (Show)
|
||||
|
||||
instance Encoding NtfSubAction where
|
||||
instance Encoding NtfSubNTFAction where
|
||||
smpEncode = \case
|
||||
NSACreate -> "N"
|
||||
NSACheck -> "C"
|
||||
@@ -154,11 +166,17 @@ instance Encoding NtfSubAction where
|
||||
'N' -> pure NSACreate
|
||||
'C' -> pure NSACheck
|
||||
'D' -> pure NSADelete
|
||||
_ -> fail "bad NtfSubAction"
|
||||
_ -> fail "bad NtfSubNTFAction"
|
||||
|
||||
instance FromField NtfSubAction where fromField = blobFieldDecoder smpDecode
|
||||
instance FromField NtfSubNTFAction where fromField = blobFieldDecoder smpDecode
|
||||
|
||||
instance ToField NtfSubAction where toField = toField . smpEncode
|
||||
instance ToField NtfSubNTFAction where toField = toField . smpEncode
|
||||
|
||||
data NtfSubSMPActionData = NtfSubSMPActionData
|
||||
{ smpAction :: NtfSubSMPAction,
|
||||
actionTs :: UTCTime,
|
||||
ntfSubscription :: NtfSubscription
|
||||
}
|
||||
|
||||
data NtfSubSMPAction
|
||||
= NSASmpKey
|
||||
@@ -222,19 +240,17 @@ data NtfSubscription = NtfSubscription
|
||||
ntfQueueId :: Maybe NotifierId,
|
||||
ntfServer :: NtfServer,
|
||||
ntfSubId :: Maybe NtfSubscriptionId,
|
||||
ntfSubStatus :: NtfAgentSubStatus,
|
||||
ntfSubActionTs :: UTCTime
|
||||
ntfSubStatus :: NtfAgentSubStatus
|
||||
}
|
||||
deriving (Show)
|
||||
|
||||
newNtfSubscription :: ConnId -> SMPServer -> Maybe NotifierId -> NtfServer -> NtfAgentSubStatus -> UTCTime -> NtfSubscription
|
||||
newNtfSubscription connId smpServer ntfQueueId ntfServer ntfSubStatus ntfSubActionTs =
|
||||
newNtfSubscription :: ConnId -> SMPServer -> Maybe NotifierId -> NtfServer -> NtfAgentSubStatus -> NtfSubscription
|
||||
newNtfSubscription connId smpServer ntfQueueId ntfServer ntfSubStatus =
|
||||
NtfSubscription
|
||||
{ connId,
|
||||
smpServer,
|
||||
ntfQueueId,
|
||||
ntfServer,
|
||||
ntfSubId = Nothing,
|
||||
ntfSubStatus,
|
||||
ntfSubActionTs
|
||||
ntfSubStatus
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user