ntf: batch smp worker command processing (#1331)

* ntf: refactor smp worker (separate command processing into steps)

---------

Co-authored-by: Evgeny Poberezkin <evgeny@poberezkin.com>
This commit is contained in:
spaced4ndy
2024-09-25 15:00:06 +04:00
committed by GitHub
parent 1f67b403de
commit 81fcdf8ac9
4 changed files with 178 additions and 95 deletions
+32 -14
View File
@@ -56,8 +56,10 @@ module Simplex.Messaging.Agent.Client
secureQueue,
secureSndQueue,
enableQueueNotifications,
EnableQueueNtfReq (..),
enableQueuesNtfs,
disableQueueNotifications,
DisableQueueNtfReq,
disableQueuesNtfs,
sendAgentMessage,
getQueueInfo,
@@ -257,8 +259,8 @@ import Simplex.Messaging.Protocol
VersionSMPC,
XFTPServer,
XFTPServerWithAuth,
pattern NoEntity,
sameSrvAddr',
pattern NoEntity,
)
import qualified Simplex.Messaging.Protocol as SMP
import Simplex.Messaging.Server.QueueStore.QueueInfo
@@ -1606,24 +1608,39 @@ enableQueueNotifications c rq@RcvQueue {rcvId, rcvPrivateKey} notifierKey rcvNtf
withSMPClient c rq "NKEY <nkey>" $ \smp ->
enableSMPQueueNotifications smp rcvPrivateKey rcvId notifierKey rcvNtfPublicDhKey
type RcvQueueNtf = (RcvQueue, SMP.NtfPublicAuthKey, SMP.RcvNtfPublicDhKey)
data EnableQueueNtfReq = EnableQueueNtfReq
{ eqnrNtfSub :: NtfSubscription,
eqnrRq :: RcvQueue,
eqnrAuthKeyPair :: C.AAuthKeyPair,
eqnrRcvKeyPair :: C.KeyPairX25519
}
enableQueuesNtfs :: AgentClient -> [RcvQueueNtf] -> AM' [(RcvQueueNtf, Either AgentErrorType (SMP.NotifierId, SMP.RcvNtfPublicDhKey))]
enableQueuesNtfs = sendTSessionBatches "NKEY" fst3 enableQueues_
enableQueuesNtfs :: AgentClient -> [EnableQueueNtfReq] -> AM' [(EnableQueueNtfReq, Either AgentErrorType (SMP.NotifierId, SMP.RcvNtfPublicDhKey))]
enableQueuesNtfs = sendTSessionBatches "NKEY" eqnrRq enableQueues_
where
fst3 (x, _, _) = x
enableQueues_ :: SMPClient -> NonEmpty RcvQueueNtf -> IO (NonEmpty (RcvQueueNtf, Either (ProtocolClientError ErrorType) (SMP.NotifierId, RcvNtfPublicDhKey)))
enableQueues_ smp qs' = L.zipWith (,) qs' <$> enableSMPQueuesNtfs smp (L.map queueCreds qs')
queueCreds :: RcvQueueNtf -> (SMP.RcvPrivateAuthKey, SMP.RecipientId, SMP.NtfPublicAuthKey, SMP.RcvNtfPublicDhKey)
queueCreds (RcvQueue {rcvPrivateKey, rcvId}, notifierKey, rcvNtfPublicDhKey) = (rcvPrivateKey, rcvId, notifierKey, rcvNtfPublicDhKey)
enableQueues_ :: SMPClient -> NonEmpty EnableQueueNtfReq -> IO (NonEmpty (EnableQueueNtfReq, Either (ProtocolClientError ErrorType) (SMP.NotifierId, RcvNtfPublicDhKey)))
enableQueues_ smp qs' = L.zip qs' <$> enableSMPQueuesNtfs smp (L.map queueCreds qs')
queueCreds :: EnableQueueNtfReq -> (SMP.RcvPrivateAuthKey, SMP.RecipientId, SMP.NtfPublicAuthKey, SMP.RcvNtfPublicDhKey)
queueCreds EnableQueueNtfReq {eqnrRq, eqnrAuthKeyPair, eqnrRcvKeyPair} =
let RcvQueue {rcvPrivateKey, rcvId} = eqnrRq
(ntfPublicKey, _) = eqnrAuthKeyPair
(rcvNtfPubDhKey, _) = eqnrRcvKeyPair
in (rcvPrivateKey, rcvId, ntfPublicKey, rcvNtfPubDhKey)
disableQueueNotifications :: AgentClient -> RcvQueue -> AM ()
disableQueueNotifications c rq@RcvQueue {rcvId, rcvPrivateKey} =
withSMPClient c rq "NDEL" $ \smp ->
disableSMPQueueNotifications smp rcvPrivateKey rcvId
disableQueuesNtfs :: AgentClient -> [RcvQueue] -> AM' [(RcvQueue, Either AgentErrorType ())]
disableQueuesNtfs = sendTSessionBatches "NDEL" id $ sendBatch disableSMPQueuesNtfs
type DisableQueueNtfReq = (NtfSubscription, RcvQueue)
disableQueuesNtfs :: AgentClient -> [DisableQueueNtfReq] -> AM' [(DisableQueueNtfReq, Either AgentErrorType ())]
disableQueuesNtfs = sendTSessionBatches "NDEL" snd disableQueues_
where
disableQueues_ :: SMPClient -> NonEmpty DisableQueueNtfReq -> IO (NonEmpty (DisableQueueNtfReq, Either (ProtocolClientError ErrorType) ()))
disableQueues_ smp qs' = L.zip qs' <$> disableSMPQueuesNtfs smp (L.map queueCreds qs')
queueCreds :: DisableQueueNtfReq -> (SMP.RcvPrivateAuthKey, SMP.RecipientId)
queueCreds (_, RcvQueue {rcvPrivateKey, rcvId}) = (rcvPrivateKey, rcvId)
sendAck :: AgentClient -> RcvQueue -> MsgId -> AM ()
sendAck c rq@RcvQueue {rcvId, rcvPrivateKey} msgId = do
@@ -1830,11 +1847,12 @@ withWorkItems c doWork getWork action = do
Just items' -> action items'
Nothing -> do
let criticalErr = find workItemError errs
forM_ criticalErr $ \ err -> do
forM_ criticalErr $ \err -> do
notifyErr (CRITICAL False) err
when (all workItemError errs) noWork
unless (null errs) $ atomically $
writeTBQueue (subQ c) ("", "", AEvt SAENone $ ERRS $ map (\e -> ("", INTERNAL $ show e)) errs)
unless (null errs) $
atomically $
writeTBQueue (subQ c) ("", "", AEvt SAENone $ ERRS $ map (\e -> ("", INTERNAL $ show e)) errs)
Left e
| workItemError e -> noWork >> notifyErr (CRITICAL False) e
| otherwise -> notifyErr INTERNAL e
+3 -1
View File
@@ -150,6 +150,7 @@ data AgentConfig = AgentConfig
xftpMaxRecipientsPerRequest :: Int,
deleteErrorCount :: Int,
ntfCron :: Word16,
ntfBatchSize :: Int,
ntfSubCheckInterval :: NominalDiffTime,
caCertificateFile :: FilePath,
privateKeyFile :: FilePath,
@@ -219,6 +220,7 @@ defaultAgentConfig =
xftpMaxRecipientsPerRequest = 200,
deleteErrorCount = 10,
ntfCron = 20, -- minutes
ntfBatchSize = 200,
ntfSubCheckInterval = nominalDay,
-- CA certificate private key is not needed for initialization
-- ! we do not generate these
@@ -259,7 +261,7 @@ data NtfSupervisor = NtfSupervisor
ntfSMPWorkers :: TMap SMPServer Worker
}
data NtfSupervisorCommand = NSCCreate | NSCSmpDelete | NSCNtfWorker NtfServer | NSCNtfSMPWorker SMPServer | NSCDeleteSub
data NtfSupervisorCommand = NSCCreate | NSCSmpDelete | NSCDeleteSub
deriving (Show)
newNtfSubSupervisor :: Natural -> IO NtfSupervisor
+129 -66
View File
@@ -23,6 +23,7 @@ import Control.Logger.Simple (logError, logInfo)
import Control.Monad
import Control.Monad.Reader
import Control.Monad.Trans.Except
import Crypto.Random (ChaChaDRG)
import Data.Bifunctor (first)
import Data.Either (partitionEithers)
import Data.Foldable (foldr')
@@ -45,6 +46,7 @@ import qualified Simplex.Messaging.Crypto as C
import Simplex.Messaging.Notifications.Protocol (NtfSubStatus (..), NtfTknStatus (..), SMPQueueNtf (..))
import Simplex.Messaging.Notifications.Types
import Simplex.Messaging.Protocol (NtfServer, sameSrvAddr)
import qualified Simplex.Messaging.Protocol as SMP
import Simplex.Messaging.Util (diffToMicroseconds, threadDelay', tshow, unlessM)
import System.Random (randomR)
import UnliftIO
@@ -71,13 +73,16 @@ partitionErrs :: (a -> ConnId) -> [a] -> [Either AgentErrorType b] -> ([(ConnId,
partitionErrs f xs = partitionEithers . zipWith (\x -> first (f x,)) xs
{-# INLINE partitionErrs #-}
ntfSubConnId :: NtfSubscription -> ConnId
ntfSubConnId NtfSubscription {connId} = connId
processNtfCmd :: AgentClient -> (NtfSupervisorCommand, NonEmpty ConnId) -> AM ()
processNtfCmd c (cmd, connIds) = do
logInfo $ "processNtfCmd - cmd = " <> tshow cmd
let connIds' = L.toList connIds
case cmd of
NSCCreate -> do
(cErrs, rqSubActions) <- lift $ partitionErrs id connIds' <$> (withStoreBatch c $ \db -> map (getQueueSub db) connIds')
(cErrs, rqSubActions) <- lift $ partitionErrs id connIds' <$> withStoreBatch c (\db -> map (getQueueSub db) connIds')
notifyErrs c cErrs
logInfo $ "processNtfCmd, NSCCreate - length rqSubs = " <> tshow (length rqSubActions)
let (ns, rs, css, cns) = partitionQueueSubActions rqSubActions
@@ -99,7 +104,7 @@ processNtfCmd c (cmd, connIds) = do
createNewSubs rqs = do
withTokenServer $ \ntfServer -> do
let newSubs = map (rqToNewSub ntfServer) rqs
(cErrs, _) <- lift $ partitionErrs ntfSubConnId newSubs <$> (withStoreBatch c $ \db -> map (storeNewSub db) newSubs)
(cErrs, _) <- lift $ partitionErrs ntfSubConnId newSubs <$> withStoreBatch c (\db -> map (storeNewSub db) newSubs)
notifyErrs c cErrs
kickSMPWorkers rqs
where
@@ -111,7 +116,7 @@ processNtfCmd c (cmd, connIds) = do
resetSubs rqSubs = do
withTokenServer $ \ntfServer -> do
let subsToReset = map (toResetSub ntfServer) rqSubs
(cErrs, _) <- lift $ partitionErrs ntfSubConnId subsToReset <$> (withStoreBatch' c $ \db -> map (storeResetSub db) subsToReset)
(cErrs, _) <- lift $ partitionErrs ntfSubConnId subsToReset <$> withStoreBatch' c (\db -> map (storeResetSub db) subsToReset)
notifyErrs c cErrs
let rqs = map fst rqSubs
kickSMPWorkers rqs
@@ -122,7 +127,6 @@ processNtfCmd c (cmd, connIds) = do
in sub {smpServer, ntfQueueId = Nothing, ntfServer, ntfSubId = Nothing, ntfSubStatus = NASNew}
storeResetSub :: DB.Connection -> NtfSubscription -> IO ()
storeResetSub db sub = supervisorUpdateNtfSub db sub (NSASMP NSASmpKey)
ntfSubConnId NtfSubscription {connId} = connId
partitionQueueSubActions ::
[(RcvQueue, Maybe NtfSupervisorSub)] ->
( [RcvQueue], -- new subs
@@ -157,9 +161,9 @@ processNtfCmd c (cmd, connIds) = do
NSANtf _ -> (ns, rs, css, subNtfServer : cns)
reset = (ns, (rq, sub) : rs, css, cns)
NSCSmpDelete -> do
(cErrs, rqs) <- lift $ partitionErrs id connIds' <$> (withStoreBatch c $ \db -> map (getQueue db) connIds')
(cErrs, rqs) <- lift $ partitionErrs id connIds' <$> withStoreBatch c (\db -> map (getQueue db) connIds')
logInfo $ "processNtfCmd, NSCSmpDelete - length rqs = " <> tshow (length rqs)
(cErrs', _) <- lift $ partitionErrs qConnId rqs <$> (withStoreBatch' c $ \db -> map (updateAction db) rqs)
(cErrs', _) <- lift $ partitionErrs qConnId rqs <$> withStoreBatch' c (\db -> map (updateAction db) rqs)
notifyErrs c (cErrs <> cErrs')
kickSMPWorkers rqs
where
@@ -167,8 +171,6 @@ processNtfCmd c (cmd, connIds) = do
getQueue db connId = first storeError <$> getPrimaryRcvQueue db connId
updateAction :: DB.Connection -> RcvQueue -> IO ()
updateAction db rq = supervisorUpdateNtfAction db (qConnId rq) (NSASMP NSASmpDelete)
NSCNtfWorker ntfServer -> lift . void $ getNtfNTFWorker True c ntfServer
NSCNtfSMPWorker smpServer -> lift . void $ getNtfSMPWorker True c smpServer
NSCDeleteSub -> void $ lift $ withStoreBatch' c $ \db -> map (deleteNtfSubscription' db) connIds'
where
kickSMPWorkers :: [RcvQueue] -> AM ()
@@ -236,8 +238,7 @@ runNtfWorker c srv Worker {doWork} =
NSAuth -> do
withStore' c $ \db ->
updateNtfSubscription db sub {ntfServer, ntfQueueId = Nothing, ntfSubId = Nothing, ntfSubStatus = NASNew} (NSASMP NSASmpKey) ts
ns <- asks ntfSupervisor
atomically $ writeTBQueue (ntfSubQ ns) (NSCNtfSMPWorker smpServer, [connId])
lift . void $ getNtfSMPWorker True c smpServer
status -> updateSubNextCheck ts status
atomically $ incNtfServerStat c userId ntfServer ntfChecked
Nothing -> workerInternalError c connId "NSACheck - no subscription ID"
@@ -247,8 +248,7 @@ runNtfWorker c srv Worker {doWork} =
deleteNtfSub $ do
let sub' = sub {ntfSubId = Nothing, ntfSubStatus = NASOff}
withStore' c $ \db -> updateNtfSubscription db sub' (NSASMP NSASmpDelete) ts
ns <- asks ntfSupervisor
atomically $ writeTBQueue (ntfSubQ ns) (NSCNtfSMPWorker smpServer, [connId]) -- TODO [batch ntf] loop
lift . void $ getNtfSMPWorker True c smpServer
NSARotate ->
deleteNtfSub $ do
withStore' c $ \db -> deleteNtfSubscription db connId
@@ -276,51 +276,102 @@ runNtfWorker c srv Worker {doWork} =
updateNtfSubscription db sub {ntfSubStatus = toStatus} toAction actionTs'
runNtfSMPWorker :: AgentClient -> SMPServer -> Worker -> AM ()
runNtfSMPWorker c srv Worker {doWork} = do
env <- ask
forever $ do
waitForWork doWork
ExceptT . liftIO . agentOperationBracket c AONtfNetwork throwWhenInactive $
runReaderT (runExceptT runNtfSMPOperation) env
runNtfSMPWorker c srv Worker {doWork} = forever $ do
waitForWork doWork
ExceptT $ agentOperationBracket c AONtfNetwork throwWhenInactive $ runExceptT runNtfSMPOperation
where
runNtfSMPOperation =
withWork c doWork (`getNextNtfSubSMPAction` srv) $
\nextSub@(NtfSubscription {connId}, _, _) -> do
logInfo $ "runNtfSMPWorker, nextSub " <> tshow nextSub
ri <- asks $ reconnectInterval . config
withRetryInterval ri $ \_ loop -> do
liftIO $ waitWhileSuspended c
liftIO $ waitForUserNetwork c
processSub nextSub
`catchAgentError` retryOnError c "NtfSMPWorker" loop (workerInternalError c connId . show)
processSub :: (NtfSubscription, NtfSubSMPAction, NtfActionTs) -> AM ()
processSub (sub@NtfSubscription {connId, ntfServer}, smpAction, actionTs) = do
ts <- liftIO getCurrentTime
unlessM (lift $ rescheduleAction doWork ts actionTs) $
case smpAction of
NSASmpKey ->
lift getNtfToken >>= \case
Just NtfToken {ntfTknStatus = NTActive, ntfMode = NMInstant} -> do
rq <- withStore c (`getPrimaryRcvQueue` connId)
C.AuthAlg a <- asks (rcvAuthAlg . config)
g <- asks random
(ntfPublicKey, ntfPrivateKey) <- atomically $ C.generateAuthKeyPair a g
(rcvNtfPubDhKey, rcvNtfPrivDhKey) <- atomically $ C.generateKeyPair g
(notifierId, rcvNtfSrvPubDhKey) <- enableQueueNotifications c rq ntfPublicKey rcvNtfPubDhKey
let rcvNtfDhSecret = C.dh' rcvNtfSrvPubDhKey rcvNtfPrivDhKey
withStore' c $ \db -> do
setRcvQueueNtfCreds db connId $ Just ClientNtfCreds {ntfPublicKey, ntfPrivateKey, notifierId, rcvNtfDhSecret}
updateNtfSubscription db sub {ntfQueueId = Just notifierId, ntfSubStatus = NASKey} (NSANtf NSACreate) ts
ns <- asks ntfSupervisor
atomically $ sendNtfSubCommand ns (NSCNtfWorker ntfServer, [connId])
_ -> workerInternalError c connId "NSASmpKey - no active token"
NSASmpDelete -> do
-- TODO should we remove it after successful removal from the server?
rq_ <- withStore' c $ \db -> do
setRcvQueueNtfCreds db connId Nothing
getPrimaryRcvQueue db connId
mapM_ (disableQueueNotifications c) rq_
withStore' c $ \db -> deleteNtfSubscription db connId
runNtfSMPOperation :: AM ()
runNtfSMPOperation = do
ntfBatchSize <- asks $ ntfBatchSize . config
withWorkItems c doWork (\db -> getNextNtfSubSMPActions db srv ntfBatchSize) $ \nextSubs -> do
logInfo $ "runNtfSMPWorker - length nextSubs = " <> tshow (length nextSubs)
let (creates, deletes) = splitActions nextSubs
retrySubActions creates createNotifierKeys
retrySubActions deletes deleteNotifierKeys
splitActions :: NonEmpty (NtfSubSMPAction, NtfSubscription) -> ([NtfSubscription], [NtfSubscription])
splitActions = foldr addAction ([], [])
where
addAction action (creates, deletes) = case action of
(NSASmpKey, sub) -> (sub : creates, deletes)
(NSASmpDelete, sub) -> (creates, sub : deletes)
retrySubActions :: [NtfSubscription] -> ([NtfSubscription] -> AM' [NtfSubscription]) -> AM ()
retrySubActions subs action = do
v <- newTVarIO subs
ri <- asks $ reconnectInterval . config
withRetryInterval ri $ \_ loop -> do
liftIO $ waitWhileSuspended c
liftIO $ waitForUserNetwork c
subs' <- readTVarIO v
retrySubs <- lift $ action subs'
unless (null retrySubs) $ do
atomically $ writeTVar v retrySubs
retryNetworkLoop c loop
createNotifierKeys :: [NtfSubscription] -> AM' [NtfSubscription]
createNotifierKeys ntfSubs =
getNtfToken >>= \case
Just NtfToken {ntfTknStatus = NTActive, ntfMode = NMInstant} -> do
(errs1, subRqKeys) <- prepareQueueSmpKey ntfSubs
rs <- enableQueuesNtfs c subRqKeys
let (subRqKeys', errs2, successes) = splitResults rs
ntfSubs' = map eqnrNtfSub subRqKeys'
errs2' = map (first (qConnId . eqnrRq)) errs2
ts <- liftIO getCurrentTime
(errs3, srvs) <- partitionErrs (qConnId . eqnrRq . fst) successes <$> withStoreBatch' c (\db -> map (storeNtfSubCreds db ts) successes)
mapM_ (getNtfNTFWorker True c) $ S.fromList srvs
workerErrors c $ errs1 <> errs2' <> errs3
pure ntfSubs'
_ -> do
let errs = map (\sub -> (ntfSubConnId sub, INTERNAL "NSASmpKey - no active token")) ntfSubs
workerErrors c errs
pure []
where
prepareQueueSmpKey :: [NtfSubscription] -> AM' ([(ConnId, AgentErrorType)], [EnableQueueNtfReq])
prepareQueueSmpKey subs = do
alg <- asks (rcvAuthAlg . config)
g <- asks random
partitionErrs ntfSubConnId subs <$> withStoreBatch c (\db -> map (getQueue db alg g) subs)
where
getQueue :: DB.Connection -> C.AuthAlg -> TVar ChaChaDRG -> NtfSubscription -> IO (Either AgentErrorType EnableQueueNtfReq)
getQueue db (C.AuthAlg a) g sub = fmap (first storeError) $ runExceptT $ do
rq <- ExceptT $ getPrimaryRcvQueue db (ntfSubConnId sub)
authKeyPair <- atomically $ C.generateAuthKeyPair a g
rcvNtfKeyPair <- atomically $ C.generateKeyPair g
pure (EnableQueueNtfReq sub rq authKeyPair rcvNtfKeyPair)
storeNtfSubCreds :: DB.Connection -> UTCTime -> (EnableQueueNtfReq, (SMP.NotifierId, SMP.RcvNtfPublicDhKey)) -> IO NtfServer
storeNtfSubCreds db ts (EnableQueueNtfReq {eqnrNtfSub, eqnrAuthKeyPair = (ntfPublicKey, ntfPrivateKey), eqnrRcvKeyPair = (_, pk)}, (notifierId, srvPubDhKey)) = do
let NtfSubscription {ntfServer} = eqnrNtfSub
rcvNtfDhSecret = C.dh' srvPubDhKey pk
setRcvQueueNtfCreds db (ntfSubConnId eqnrNtfSub) $ Just ClientNtfCreds {ntfPublicKey, ntfPrivateKey, notifierId, rcvNtfDhSecret}
updateNtfSubscription db eqnrNtfSub {ntfQueueId = Just notifierId, ntfSubStatus = NASKey} (NSANtf NSACreate) ts
pure ntfServer
deleteNotifierKeys :: [NtfSubscription] -> AM' [NtfSubscription]
deleteNotifierKeys ntfSubs = do
(errs1, subRqs) <- partitionErrs ntfSubConnId ntfSubs <$> withStoreBatch c (\db -> map (resetCredsGetQueue db) ntfSubs)
rs <- disableQueuesNtfs c subRqs
let (subRqs', errs2, successes) = splitResults rs
ntfSubs' = map fst subRqs'
errs2' = map (first (qConnId . snd)) errs2
disabledRqs = map (snd . fst) successes
(errs3, _) <- partitionErrs qConnId disabledRqs <$> withStoreBatch' c (\db -> map (deleteSub db) disabledRqs)
workerErrors c $ errs1 <> errs2' <> errs3
pure ntfSubs'
where
resetCredsGetQueue :: DB.Connection -> NtfSubscription -> IO (Either AgentErrorType DisableQueueNtfReq)
resetCredsGetQueue db sub@NtfSubscription {connId} = fmap (first storeError) $ runExceptT $ do
liftIO $ setRcvQueueNtfCreds db connId Nothing
rq <- ExceptT $ getPrimaryRcvQueue db connId
pure (sub, rq)
deleteSub :: DB.Connection -> RcvQueue -> IO ()
deleteSub db rq = deleteNtfSubscription db (qConnId rq)
-- (temporary errs, other errs, successes)
splitResults :: [(a, Either AgentErrorType r)] -> ([a], [(a, AgentErrorType)], [(a, r)])
splitResults = foldr' addRes ([], [], [])
where
addRes (a, r_) (as, errs, rs) = case r_ of
Right r -> (as, errs, (a, r) : rs)
Left e
| tempErr e -> (a : as, errs, rs)
| otherwise -> (as, (a, e) : errs, rs)
rescheduleAction :: TMVar () -> UTCTime -> UTCTime -> AM' Bool
rescheduleAction doWork ts actionTs
@@ -335,16 +386,28 @@ rescheduleAction doWork ts actionTs
retryOnError :: AgentClient -> Text -> AM () -> (AgentErrorType -> AM ()) -> AgentErrorType -> AM ()
retryOnError c name loop done e = do
logError $ name <> " error: " <> tshow e
case e of
BROKER _ NETWORK -> retryLoop
BROKER _ TIMEOUT -> retryLoop
_ -> done e
where
retryLoop = do
atomically $ endAgentOperation c AONtfNetwork
liftIO $ throwWhenInactive c
atomically $ beginAgentOperation c AONtfNetwork
loop
if tempErr e
then retryNetworkLoop c loop
else done e
tempErr :: AgentErrorType -> Bool
tempErr = \case
BROKER _ NETWORK -> True
BROKER _ TIMEOUT -> True
_ -> False
retryNetworkLoop :: AgentClient -> AM () -> AM ()
retryNetworkLoop c loop = do
atomically $ endAgentOperation c AONtfNetwork
liftIO $ throwWhenInactive c
atomically $ beginAgentOperation c AONtfNetwork
loop
workerErrors :: AgentClient -> [(ConnId, AgentErrorType)] -> AM' ()
workerErrors c connErrs =
unless (null connErrs) $ do
void $ withStoreBatch' c (\db -> map (setNullNtfSubscriptionAction db . fst) connErrs)
notifyErrs c connErrs
workerInternalError :: AgentClient -> ConnId -> String -> AM ()
workerInternalError c connId internalErrStr = do
+14 -14
View File
@@ -162,7 +162,7 @@ module Simplex.Messaging.Agent.Store.SQLite
deleteNtfSubscription',
getNextNtfSubNTFAction,
markNtfSubActionNtfFailed_, -- exported for tests
getNextNtfSubSMPAction,
getNextNtfSubSMPActions,
markNtfSubActionSMPFailed_, -- exported for tests
getActiveNtfToken,
getNtfRcvQueue,
@@ -1671,14 +1671,14 @@ markNtfSubActionNtfFailed_ :: DB.Connection -> ConnId -> IO ()
markNtfSubActionNtfFailed_ db connId =
DB.execute db "UPDATE ntf_subscriptions SET ntf_failed = 1 where conn_id = ?" (Only connId)
getNextNtfSubSMPAction :: DB.Connection -> SMPServer -> IO (Either StoreError (Maybe (NtfSubscription, NtfSubSMPAction, NtfActionTs)))
getNextNtfSubSMPAction db smpServer@(SMPServer smpHost smpPort _) =
getWorkItem "ntf SMP" getNtfConnId getNtfSubAction (markNtfSubActionSMPFailed_ db)
getNextNtfSubSMPActions :: DB.Connection -> SMPServer -> Int -> IO (Either StoreError [Either StoreError (NtfSubSMPAction, NtfSubscription)])
getNextNtfSubSMPActions db smpServer@(SMPServer smpHost smpPort _) ntfBatchSize =
getWorkItems "ntf SMP" getNtfConnIds getNtfSubAction (markNtfSubActionSMPFailed_ db)
where
getNtfConnId :: IO (Maybe ConnId)
getNtfConnId =
maybeFirstRow fromOnly $
DB.query
getNtfConnIds :: IO [ConnId]
getNtfConnIds =
map fromOnly
<$> DB.query
db
[sql|
SELECT conn_id
@@ -1686,10 +1686,10 @@ getNextNtfSubSMPAction db smpServer@(SMPServer smpHost smpPort _) =
WHERE smp_host = ? AND smp_port = ? AND ntf_sub_smp_action IS NOT NULL AND ntf_sub_action_ts IS NOT NULL
AND (smp_failed = 0 OR updated_by_supervisor = 1)
ORDER BY ntf_sub_action_ts ASC
LIMIT 1
LIMIT ?
|]
(smpHost, smpPort)
getNtfSubAction :: ConnId -> IO (Either StoreError (NtfSubscription, NtfSubSMPAction, NtfActionTs))
(smpHost, smpPort, ntfBatchSize)
getNtfSubAction :: ConnId -> IO (Either StoreError (NtfSubSMPAction, NtfSubscription))
getNtfSubAction connId = do
markUpdatedByWorker db connId
firstRow ntfSubAction err $
@@ -1697,7 +1697,7 @@ getNextNtfSubSMPAction db smpServer@(SMPServer smpHost smpPort _) =
db
[sql|
SELECT c.user_id, s.ntf_host, s.ntf_port, s.ntf_key_hash,
ns.smp_ntf_id, ns.ntf_sub_id, ns.ntf_sub_status, ns.ntf_sub_action_ts, ns.ntf_sub_smp_action
ns.smp_ntf_id, ns.ntf_sub_id, ns.ntf_sub_status, ns.ntf_sub_smp_action
FROM ntf_subscriptions ns
JOIN connections c USING (conn_id)
JOIN ntf_servers s USING (ntf_host, ntf_port)
@@ -1706,10 +1706,10 @@ getNextNtfSubSMPAction db smpServer@(SMPServer smpHost smpPort _) =
(Only connId)
where
err = SEInternal $ "ntf subscription " <> bshow connId <> " returned []"
ntfSubAction (userId, ntfHost, ntfPort, ntfKeyHash, ntfQueueId, ntfSubId, ntfSubStatus, actionTs, action) =
ntfSubAction (userId, ntfHost, ntfPort, ntfKeyHash, ntfQueueId, ntfSubId, ntfSubStatus, action) =
let ntfServer = NtfServer ntfHost ntfPort ntfKeyHash
ntfSubscription = NtfSubscription {userId, connId, smpServer, ntfQueueId, ntfServer, ntfSubId, ntfSubStatus}
in (ntfSubscription, action, actionTs)
in (action, ntfSubscription)
markNtfSubActionSMPFailed_ :: DB.Connection -> ConnId -> IO ()
markNtfSubActionSMPFailed_ db connId =