diff --git a/src/Simplex/Messaging/Agent/Client.hs b/src/Simplex/Messaging/Agent/Client.hs index 88d1eeac2..f578358f1 100644 --- a/src/Simplex/Messaging/Agent/Client.hs +++ b/src/Simplex/Messaging/Agent/Client.hs @@ -56,8 +56,10 @@ module Simplex.Messaging.Agent.Client secureQueue, secureSndQueue, enableQueueNotifications, + EnableQueueNtfReq (..), enableQueuesNtfs, disableQueueNotifications, + DisableQueueNtfReq, disableQueuesNtfs, sendAgentMessage, getQueueInfo, @@ -257,8 +259,8 @@ import Simplex.Messaging.Protocol VersionSMPC, XFTPServer, XFTPServerWithAuth, - pattern NoEntity, sameSrvAddr', + pattern NoEntity, ) import qualified Simplex.Messaging.Protocol as SMP import Simplex.Messaging.Server.QueueStore.QueueInfo @@ -1606,24 +1608,39 @@ enableQueueNotifications c rq@RcvQueue {rcvId, rcvPrivateKey} notifierKey rcvNtf withSMPClient c rq "NKEY " $ \smp -> enableSMPQueueNotifications smp rcvPrivateKey rcvId notifierKey rcvNtfPublicDhKey -type RcvQueueNtf = (RcvQueue, SMP.NtfPublicAuthKey, SMP.RcvNtfPublicDhKey) +data EnableQueueNtfReq = EnableQueueNtfReq + { eqnrNtfSub :: NtfSubscription, + eqnrRq :: RcvQueue, + eqnrAuthKeyPair :: C.AAuthKeyPair, + eqnrRcvKeyPair :: C.KeyPairX25519 + } -enableQueuesNtfs :: AgentClient -> [RcvQueueNtf] -> AM' [(RcvQueueNtf, Either AgentErrorType (SMP.NotifierId, SMP.RcvNtfPublicDhKey))] -enableQueuesNtfs = sendTSessionBatches "NKEY" fst3 enableQueues_ +enableQueuesNtfs :: AgentClient -> [EnableQueueNtfReq] -> AM' [(EnableQueueNtfReq, Either AgentErrorType (SMP.NotifierId, SMP.RcvNtfPublicDhKey))] +enableQueuesNtfs = sendTSessionBatches "NKEY" eqnrRq enableQueues_ where - fst3 (x, _, _) = x - enableQueues_ :: SMPClient -> NonEmpty RcvQueueNtf -> IO (NonEmpty (RcvQueueNtf, Either (ProtocolClientError ErrorType) (SMP.NotifierId, RcvNtfPublicDhKey))) - enableQueues_ smp qs' = L.zipWith (,) qs' <$> enableSMPQueuesNtfs smp (L.map queueCreds qs') - queueCreds :: RcvQueueNtf -> (SMP.RcvPrivateAuthKey, SMP.RecipientId, SMP.NtfPublicAuthKey, SMP.RcvNtfPublicDhKey) - queueCreds (RcvQueue {rcvPrivateKey, rcvId}, notifierKey, rcvNtfPublicDhKey) = (rcvPrivateKey, rcvId, notifierKey, rcvNtfPublicDhKey) + enableQueues_ :: SMPClient -> NonEmpty EnableQueueNtfReq -> IO (NonEmpty (EnableQueueNtfReq, Either (ProtocolClientError ErrorType) (SMP.NotifierId, RcvNtfPublicDhKey))) + enableQueues_ smp qs' = L.zip qs' <$> enableSMPQueuesNtfs smp (L.map queueCreds qs') + queueCreds :: EnableQueueNtfReq -> (SMP.RcvPrivateAuthKey, SMP.RecipientId, SMP.NtfPublicAuthKey, SMP.RcvNtfPublicDhKey) + queueCreds EnableQueueNtfReq {eqnrRq, eqnrAuthKeyPair, eqnrRcvKeyPair} = + let RcvQueue {rcvPrivateKey, rcvId} = eqnrRq + (ntfPublicKey, _) = eqnrAuthKeyPair + (rcvNtfPubDhKey, _) = eqnrRcvKeyPair + in (rcvPrivateKey, rcvId, ntfPublicKey, rcvNtfPubDhKey) disableQueueNotifications :: AgentClient -> RcvQueue -> AM () disableQueueNotifications c rq@RcvQueue {rcvId, rcvPrivateKey} = withSMPClient c rq "NDEL" $ \smp -> disableSMPQueueNotifications smp rcvPrivateKey rcvId -disableQueuesNtfs :: AgentClient -> [RcvQueue] -> AM' [(RcvQueue, Either AgentErrorType ())] -disableQueuesNtfs = sendTSessionBatches "NDEL" id $ sendBatch disableSMPQueuesNtfs +type DisableQueueNtfReq = (NtfSubscription, RcvQueue) + +disableQueuesNtfs :: AgentClient -> [DisableQueueNtfReq] -> AM' [(DisableQueueNtfReq, Either AgentErrorType ())] +disableQueuesNtfs = sendTSessionBatches "NDEL" snd disableQueues_ + where + disableQueues_ :: SMPClient -> NonEmpty DisableQueueNtfReq -> IO (NonEmpty (DisableQueueNtfReq, Either (ProtocolClientError ErrorType) ())) + disableQueues_ smp qs' = L.zip qs' <$> disableSMPQueuesNtfs smp (L.map queueCreds qs') + queueCreds :: DisableQueueNtfReq -> (SMP.RcvPrivateAuthKey, SMP.RecipientId) + queueCreds (_, RcvQueue {rcvPrivateKey, rcvId}) = (rcvPrivateKey, rcvId) sendAck :: AgentClient -> RcvQueue -> MsgId -> AM () sendAck c rq@RcvQueue {rcvId, rcvPrivateKey} msgId = do @@ -1830,11 +1847,12 @@ withWorkItems c doWork getWork action = do Just items' -> action items' Nothing -> do let criticalErr = find workItemError errs - forM_ criticalErr $ \ err -> do + forM_ criticalErr $ \err -> do notifyErr (CRITICAL False) err when (all workItemError errs) noWork - unless (null errs) $ atomically $ - writeTBQueue (subQ c) ("", "", AEvt SAENone $ ERRS $ map (\e -> ("", INTERNAL $ show e)) errs) + unless (null errs) $ + atomically $ + writeTBQueue (subQ c) ("", "", AEvt SAENone $ ERRS $ map (\e -> ("", INTERNAL $ show e)) errs) Left e | workItemError e -> noWork >> notifyErr (CRITICAL False) e | otherwise -> notifyErr INTERNAL e diff --git a/src/Simplex/Messaging/Agent/Env/SQLite.hs b/src/Simplex/Messaging/Agent/Env/SQLite.hs index 0f179c70c..a550394c1 100644 --- a/src/Simplex/Messaging/Agent/Env/SQLite.hs +++ b/src/Simplex/Messaging/Agent/Env/SQLite.hs @@ -150,6 +150,7 @@ data AgentConfig = AgentConfig xftpMaxRecipientsPerRequest :: Int, deleteErrorCount :: Int, ntfCron :: Word16, + ntfBatchSize :: Int, ntfSubCheckInterval :: NominalDiffTime, caCertificateFile :: FilePath, privateKeyFile :: FilePath, @@ -219,6 +220,7 @@ defaultAgentConfig = xftpMaxRecipientsPerRequest = 200, deleteErrorCount = 10, ntfCron = 20, -- minutes + ntfBatchSize = 200, ntfSubCheckInterval = nominalDay, -- CA certificate private key is not needed for initialization -- ! we do not generate these @@ -259,7 +261,7 @@ data NtfSupervisor = NtfSupervisor ntfSMPWorkers :: TMap SMPServer Worker } -data NtfSupervisorCommand = NSCCreate | NSCSmpDelete | NSCNtfWorker NtfServer | NSCNtfSMPWorker SMPServer | NSCDeleteSub +data NtfSupervisorCommand = NSCCreate | NSCSmpDelete | NSCDeleteSub deriving (Show) newNtfSubSupervisor :: Natural -> IO NtfSupervisor diff --git a/src/Simplex/Messaging/Agent/NtfSubSupervisor.hs b/src/Simplex/Messaging/Agent/NtfSubSupervisor.hs index ad1acae5a..9a949eec4 100644 --- a/src/Simplex/Messaging/Agent/NtfSubSupervisor.hs +++ b/src/Simplex/Messaging/Agent/NtfSubSupervisor.hs @@ -23,6 +23,7 @@ import Control.Logger.Simple (logError, logInfo) import Control.Monad import Control.Monad.Reader import Control.Monad.Trans.Except +import Crypto.Random (ChaChaDRG) import Data.Bifunctor (first) import Data.Either (partitionEithers) import Data.Foldable (foldr') @@ -45,6 +46,7 @@ import qualified Simplex.Messaging.Crypto as C import Simplex.Messaging.Notifications.Protocol (NtfSubStatus (..), NtfTknStatus (..), SMPQueueNtf (..)) import Simplex.Messaging.Notifications.Types import Simplex.Messaging.Protocol (NtfServer, sameSrvAddr) +import qualified Simplex.Messaging.Protocol as SMP import Simplex.Messaging.Util (diffToMicroseconds, threadDelay', tshow, unlessM) import System.Random (randomR) import UnliftIO @@ -71,13 +73,16 @@ partitionErrs :: (a -> ConnId) -> [a] -> [Either AgentErrorType b] -> ([(ConnId, partitionErrs f xs = partitionEithers . zipWith (\x -> first (f x,)) xs {-# INLINE partitionErrs #-} +ntfSubConnId :: NtfSubscription -> ConnId +ntfSubConnId NtfSubscription {connId} = connId + processNtfCmd :: AgentClient -> (NtfSupervisorCommand, NonEmpty ConnId) -> AM () processNtfCmd c (cmd, connIds) = do logInfo $ "processNtfCmd - cmd = " <> tshow cmd let connIds' = L.toList connIds case cmd of NSCCreate -> do - (cErrs, rqSubActions) <- lift $ partitionErrs id connIds' <$> (withStoreBatch c $ \db -> map (getQueueSub db) connIds') + (cErrs, rqSubActions) <- lift $ partitionErrs id connIds' <$> withStoreBatch c (\db -> map (getQueueSub db) connIds') notifyErrs c cErrs logInfo $ "processNtfCmd, NSCCreate - length rqSubs = " <> tshow (length rqSubActions) let (ns, rs, css, cns) = partitionQueueSubActions rqSubActions @@ -99,7 +104,7 @@ processNtfCmd c (cmd, connIds) = do createNewSubs rqs = do withTokenServer $ \ntfServer -> do let newSubs = map (rqToNewSub ntfServer) rqs - (cErrs, _) <- lift $ partitionErrs ntfSubConnId newSubs <$> (withStoreBatch c $ \db -> map (storeNewSub db) newSubs) + (cErrs, _) <- lift $ partitionErrs ntfSubConnId newSubs <$> withStoreBatch c (\db -> map (storeNewSub db) newSubs) notifyErrs c cErrs kickSMPWorkers rqs where @@ -111,7 +116,7 @@ processNtfCmd c (cmd, connIds) = do resetSubs rqSubs = do withTokenServer $ \ntfServer -> do let subsToReset = map (toResetSub ntfServer) rqSubs - (cErrs, _) <- lift $ partitionErrs ntfSubConnId subsToReset <$> (withStoreBatch' c $ \db -> map (storeResetSub db) subsToReset) + (cErrs, _) <- lift $ partitionErrs ntfSubConnId subsToReset <$> withStoreBatch' c (\db -> map (storeResetSub db) subsToReset) notifyErrs c cErrs let rqs = map fst rqSubs kickSMPWorkers rqs @@ -122,7 +127,6 @@ processNtfCmd c (cmd, connIds) = do in sub {smpServer, ntfQueueId = Nothing, ntfServer, ntfSubId = Nothing, ntfSubStatus = NASNew} storeResetSub :: DB.Connection -> NtfSubscription -> IO () storeResetSub db sub = supervisorUpdateNtfSub db sub (NSASMP NSASmpKey) - ntfSubConnId NtfSubscription {connId} = connId partitionQueueSubActions :: [(RcvQueue, Maybe NtfSupervisorSub)] -> ( [RcvQueue], -- new subs @@ -157,9 +161,9 @@ processNtfCmd c (cmd, connIds) = do NSANtf _ -> (ns, rs, css, subNtfServer : cns) reset = (ns, (rq, sub) : rs, css, cns) NSCSmpDelete -> do - (cErrs, rqs) <- lift $ partitionErrs id connIds' <$> (withStoreBatch c $ \db -> map (getQueue db) connIds') + (cErrs, rqs) <- lift $ partitionErrs id connIds' <$> withStoreBatch c (\db -> map (getQueue db) connIds') logInfo $ "processNtfCmd, NSCSmpDelete - length rqs = " <> tshow (length rqs) - (cErrs', _) <- lift $ partitionErrs qConnId rqs <$> (withStoreBatch' c $ \db -> map (updateAction db) rqs) + (cErrs', _) <- lift $ partitionErrs qConnId rqs <$> withStoreBatch' c (\db -> map (updateAction db) rqs) notifyErrs c (cErrs <> cErrs') kickSMPWorkers rqs where @@ -167,8 +171,6 @@ processNtfCmd c (cmd, connIds) = do getQueue db connId = first storeError <$> getPrimaryRcvQueue db connId updateAction :: DB.Connection -> RcvQueue -> IO () updateAction db rq = supervisorUpdateNtfAction db (qConnId rq) (NSASMP NSASmpDelete) - NSCNtfWorker ntfServer -> lift . void $ getNtfNTFWorker True c ntfServer - NSCNtfSMPWorker smpServer -> lift . void $ getNtfSMPWorker True c smpServer NSCDeleteSub -> void $ lift $ withStoreBatch' c $ \db -> map (deleteNtfSubscription' db) connIds' where kickSMPWorkers :: [RcvQueue] -> AM () @@ -236,8 +238,7 @@ runNtfWorker c srv Worker {doWork} = NSAuth -> do withStore' c $ \db -> updateNtfSubscription db sub {ntfServer, ntfQueueId = Nothing, ntfSubId = Nothing, ntfSubStatus = NASNew} (NSASMP NSASmpKey) ts - ns <- asks ntfSupervisor - atomically $ writeTBQueue (ntfSubQ ns) (NSCNtfSMPWorker smpServer, [connId]) + lift . void $ getNtfSMPWorker True c smpServer status -> updateSubNextCheck ts status atomically $ incNtfServerStat c userId ntfServer ntfChecked Nothing -> workerInternalError c connId "NSACheck - no subscription ID" @@ -247,8 +248,7 @@ runNtfWorker c srv Worker {doWork} = deleteNtfSub $ do let sub' = sub {ntfSubId = Nothing, ntfSubStatus = NASOff} withStore' c $ \db -> updateNtfSubscription db sub' (NSASMP NSASmpDelete) ts - ns <- asks ntfSupervisor - atomically $ writeTBQueue (ntfSubQ ns) (NSCNtfSMPWorker smpServer, [connId]) -- TODO [batch ntf] loop + lift . void $ getNtfSMPWorker True c smpServer NSARotate -> deleteNtfSub $ do withStore' c $ \db -> deleteNtfSubscription db connId @@ -276,51 +276,102 @@ runNtfWorker c srv Worker {doWork} = updateNtfSubscription db sub {ntfSubStatus = toStatus} toAction actionTs' runNtfSMPWorker :: AgentClient -> SMPServer -> Worker -> AM () -runNtfSMPWorker c srv Worker {doWork} = do - env <- ask - forever $ do - waitForWork doWork - ExceptT . liftIO . agentOperationBracket c AONtfNetwork throwWhenInactive $ - runReaderT (runExceptT runNtfSMPOperation) env +runNtfSMPWorker c srv Worker {doWork} = forever $ do + waitForWork doWork + ExceptT $ agentOperationBracket c AONtfNetwork throwWhenInactive $ runExceptT runNtfSMPOperation where - runNtfSMPOperation = - withWork c doWork (`getNextNtfSubSMPAction` srv) $ - \nextSub@(NtfSubscription {connId}, _, _) -> do - logInfo $ "runNtfSMPWorker, nextSub " <> tshow nextSub - ri <- asks $ reconnectInterval . config - withRetryInterval ri $ \_ loop -> do - liftIO $ waitWhileSuspended c - liftIO $ waitForUserNetwork c - processSub nextSub - `catchAgentError` retryOnError c "NtfSMPWorker" loop (workerInternalError c connId . show) - processSub :: (NtfSubscription, NtfSubSMPAction, NtfActionTs) -> AM () - processSub (sub@NtfSubscription {connId, ntfServer}, smpAction, actionTs) = do - ts <- liftIO getCurrentTime - unlessM (lift $ rescheduleAction doWork ts actionTs) $ - case smpAction of - NSASmpKey -> - lift getNtfToken >>= \case - Just NtfToken {ntfTknStatus = NTActive, ntfMode = NMInstant} -> do - rq <- withStore c (`getPrimaryRcvQueue` connId) - C.AuthAlg a <- asks (rcvAuthAlg . config) - g <- asks random - (ntfPublicKey, ntfPrivateKey) <- atomically $ C.generateAuthKeyPair a g - (rcvNtfPubDhKey, rcvNtfPrivDhKey) <- atomically $ C.generateKeyPair g - (notifierId, rcvNtfSrvPubDhKey) <- enableQueueNotifications c rq ntfPublicKey rcvNtfPubDhKey - let rcvNtfDhSecret = C.dh' rcvNtfSrvPubDhKey rcvNtfPrivDhKey - withStore' c $ \db -> do - setRcvQueueNtfCreds db connId $ Just ClientNtfCreds {ntfPublicKey, ntfPrivateKey, notifierId, rcvNtfDhSecret} - updateNtfSubscription db sub {ntfQueueId = Just notifierId, ntfSubStatus = NASKey} (NSANtf NSACreate) ts - ns <- asks ntfSupervisor - atomically $ sendNtfSubCommand ns (NSCNtfWorker ntfServer, [connId]) - _ -> workerInternalError c connId "NSASmpKey - no active token" - NSASmpDelete -> do - -- TODO should we remove it after successful removal from the server? - rq_ <- withStore' c $ \db -> do - setRcvQueueNtfCreds db connId Nothing - getPrimaryRcvQueue db connId - mapM_ (disableQueueNotifications c) rq_ - withStore' c $ \db -> deleteNtfSubscription db connId + runNtfSMPOperation :: AM () + runNtfSMPOperation = do + ntfBatchSize <- asks $ ntfBatchSize . config + withWorkItems c doWork (\db -> getNextNtfSubSMPActions db srv ntfBatchSize) $ \nextSubs -> do + logInfo $ "runNtfSMPWorker - length nextSubs = " <> tshow (length nextSubs) + let (creates, deletes) = splitActions nextSubs + retrySubActions creates createNotifierKeys + retrySubActions deletes deleteNotifierKeys + splitActions :: NonEmpty (NtfSubSMPAction, NtfSubscription) -> ([NtfSubscription], [NtfSubscription]) + splitActions = foldr addAction ([], []) + where + addAction action (creates, deletes) = case action of + (NSASmpKey, sub) -> (sub : creates, deletes) + (NSASmpDelete, sub) -> (creates, sub : deletes) + retrySubActions :: [NtfSubscription] -> ([NtfSubscription] -> AM' [NtfSubscription]) -> AM () + retrySubActions subs action = do + v <- newTVarIO subs + ri <- asks $ reconnectInterval . config + withRetryInterval ri $ \_ loop -> do + liftIO $ waitWhileSuspended c + liftIO $ waitForUserNetwork c + subs' <- readTVarIO v + retrySubs <- lift $ action subs' + unless (null retrySubs) $ do + atomically $ writeTVar v retrySubs + retryNetworkLoop c loop + createNotifierKeys :: [NtfSubscription] -> AM' [NtfSubscription] + createNotifierKeys ntfSubs = + getNtfToken >>= \case + Just NtfToken {ntfTknStatus = NTActive, ntfMode = NMInstant} -> do + (errs1, subRqKeys) <- prepareQueueSmpKey ntfSubs + rs <- enableQueuesNtfs c subRqKeys + let (subRqKeys', errs2, successes) = splitResults rs + ntfSubs' = map eqnrNtfSub subRqKeys' + errs2' = map (first (qConnId . eqnrRq)) errs2 + ts <- liftIO getCurrentTime + (errs3, srvs) <- partitionErrs (qConnId . eqnrRq . fst) successes <$> withStoreBatch' c (\db -> map (storeNtfSubCreds db ts) successes) + mapM_ (getNtfNTFWorker True c) $ S.fromList srvs + workerErrors c $ errs1 <> errs2' <> errs3 + pure ntfSubs' + _ -> do + let errs = map (\sub -> (ntfSubConnId sub, INTERNAL "NSASmpKey - no active token")) ntfSubs + workerErrors c errs + pure [] + where + prepareQueueSmpKey :: [NtfSubscription] -> AM' ([(ConnId, AgentErrorType)], [EnableQueueNtfReq]) + prepareQueueSmpKey subs = do + alg <- asks (rcvAuthAlg . config) + g <- asks random + partitionErrs ntfSubConnId subs <$> withStoreBatch c (\db -> map (getQueue db alg g) subs) + where + getQueue :: DB.Connection -> C.AuthAlg -> TVar ChaChaDRG -> NtfSubscription -> IO (Either AgentErrorType EnableQueueNtfReq) + getQueue db (C.AuthAlg a) g sub = fmap (first storeError) $ runExceptT $ do + rq <- ExceptT $ getPrimaryRcvQueue db (ntfSubConnId sub) + authKeyPair <- atomically $ C.generateAuthKeyPair a g + rcvNtfKeyPair <- atomically $ C.generateKeyPair g + pure (EnableQueueNtfReq sub rq authKeyPair rcvNtfKeyPair) + storeNtfSubCreds :: DB.Connection -> UTCTime -> (EnableQueueNtfReq, (SMP.NotifierId, SMP.RcvNtfPublicDhKey)) -> IO NtfServer + storeNtfSubCreds db ts (EnableQueueNtfReq {eqnrNtfSub, eqnrAuthKeyPair = (ntfPublicKey, ntfPrivateKey), eqnrRcvKeyPair = (_, pk)}, (notifierId, srvPubDhKey)) = do + let NtfSubscription {ntfServer} = eqnrNtfSub + rcvNtfDhSecret = C.dh' srvPubDhKey pk + setRcvQueueNtfCreds db (ntfSubConnId eqnrNtfSub) $ Just ClientNtfCreds {ntfPublicKey, ntfPrivateKey, notifierId, rcvNtfDhSecret} + updateNtfSubscription db eqnrNtfSub {ntfQueueId = Just notifierId, ntfSubStatus = NASKey} (NSANtf NSACreate) ts + pure ntfServer + deleteNotifierKeys :: [NtfSubscription] -> AM' [NtfSubscription] + deleteNotifierKeys ntfSubs = do + (errs1, subRqs) <- partitionErrs ntfSubConnId ntfSubs <$> withStoreBatch c (\db -> map (resetCredsGetQueue db) ntfSubs) + rs <- disableQueuesNtfs c subRqs + let (subRqs', errs2, successes) = splitResults rs + ntfSubs' = map fst subRqs' + errs2' = map (first (qConnId . snd)) errs2 + disabledRqs = map (snd . fst) successes + (errs3, _) <- partitionErrs qConnId disabledRqs <$> withStoreBatch' c (\db -> map (deleteSub db) disabledRqs) + workerErrors c $ errs1 <> errs2' <> errs3 + pure ntfSubs' + where + resetCredsGetQueue :: DB.Connection -> NtfSubscription -> IO (Either AgentErrorType DisableQueueNtfReq) + resetCredsGetQueue db sub@NtfSubscription {connId} = fmap (first storeError) $ runExceptT $ do + liftIO $ setRcvQueueNtfCreds db connId Nothing + rq <- ExceptT $ getPrimaryRcvQueue db connId + pure (sub, rq) + deleteSub :: DB.Connection -> RcvQueue -> IO () + deleteSub db rq = deleteNtfSubscription db (qConnId rq) + -- (temporary errs, other errs, successes) + splitResults :: [(a, Either AgentErrorType r)] -> ([a], [(a, AgentErrorType)], [(a, r)]) + splitResults = foldr' addRes ([], [], []) + where + addRes (a, r_) (as, errs, rs) = case r_ of + Right r -> (as, errs, (a, r) : rs) + Left e + | tempErr e -> (a : as, errs, rs) + | otherwise -> (as, (a, e) : errs, rs) rescheduleAction :: TMVar () -> UTCTime -> UTCTime -> AM' Bool rescheduleAction doWork ts actionTs @@ -335,16 +386,28 @@ rescheduleAction doWork ts actionTs retryOnError :: AgentClient -> Text -> AM () -> (AgentErrorType -> AM ()) -> AgentErrorType -> AM () retryOnError c name loop done e = do logError $ name <> " error: " <> tshow e - case e of - BROKER _ NETWORK -> retryLoop - BROKER _ TIMEOUT -> retryLoop - _ -> done e - where - retryLoop = do - atomically $ endAgentOperation c AONtfNetwork - liftIO $ throwWhenInactive c - atomically $ beginAgentOperation c AONtfNetwork - loop + if tempErr e + then retryNetworkLoop c loop + else done e + +tempErr :: AgentErrorType -> Bool +tempErr = \case + BROKER _ NETWORK -> True + BROKER _ TIMEOUT -> True + _ -> False + +retryNetworkLoop :: AgentClient -> AM () -> AM () +retryNetworkLoop c loop = do + atomically $ endAgentOperation c AONtfNetwork + liftIO $ throwWhenInactive c + atomically $ beginAgentOperation c AONtfNetwork + loop + +workerErrors :: AgentClient -> [(ConnId, AgentErrorType)] -> AM' () +workerErrors c connErrs = + unless (null connErrs) $ do + void $ withStoreBatch' c (\db -> map (setNullNtfSubscriptionAction db . fst) connErrs) + notifyErrs c connErrs workerInternalError :: AgentClient -> ConnId -> String -> AM () workerInternalError c connId internalErrStr = do diff --git a/src/Simplex/Messaging/Agent/Store/SQLite.hs b/src/Simplex/Messaging/Agent/Store/SQLite.hs index c331c4d44..701e03af4 100644 --- a/src/Simplex/Messaging/Agent/Store/SQLite.hs +++ b/src/Simplex/Messaging/Agent/Store/SQLite.hs @@ -162,7 +162,7 @@ module Simplex.Messaging.Agent.Store.SQLite deleteNtfSubscription', getNextNtfSubNTFAction, markNtfSubActionNtfFailed_, -- exported for tests - getNextNtfSubSMPAction, + getNextNtfSubSMPActions, markNtfSubActionSMPFailed_, -- exported for tests getActiveNtfToken, getNtfRcvQueue, @@ -1671,14 +1671,14 @@ markNtfSubActionNtfFailed_ :: DB.Connection -> ConnId -> IO () markNtfSubActionNtfFailed_ db connId = DB.execute db "UPDATE ntf_subscriptions SET ntf_failed = 1 where conn_id = ?" (Only connId) -getNextNtfSubSMPAction :: DB.Connection -> SMPServer -> IO (Either StoreError (Maybe (NtfSubscription, NtfSubSMPAction, NtfActionTs))) -getNextNtfSubSMPAction db smpServer@(SMPServer smpHost smpPort _) = - getWorkItem "ntf SMP" getNtfConnId getNtfSubAction (markNtfSubActionSMPFailed_ db) +getNextNtfSubSMPActions :: DB.Connection -> SMPServer -> Int -> IO (Either StoreError [Either StoreError (NtfSubSMPAction, NtfSubscription)]) +getNextNtfSubSMPActions db smpServer@(SMPServer smpHost smpPort _) ntfBatchSize = + getWorkItems "ntf SMP" getNtfConnIds getNtfSubAction (markNtfSubActionSMPFailed_ db) where - getNtfConnId :: IO (Maybe ConnId) - getNtfConnId = - maybeFirstRow fromOnly $ - DB.query + getNtfConnIds :: IO [ConnId] + getNtfConnIds = + map fromOnly + <$> DB.query db [sql| SELECT conn_id @@ -1686,10 +1686,10 @@ getNextNtfSubSMPAction db smpServer@(SMPServer smpHost smpPort _) = WHERE smp_host = ? AND smp_port = ? AND ntf_sub_smp_action IS NOT NULL AND ntf_sub_action_ts IS NOT NULL AND (smp_failed = 0 OR updated_by_supervisor = 1) ORDER BY ntf_sub_action_ts ASC - LIMIT 1 + LIMIT ? |] - (smpHost, smpPort) - getNtfSubAction :: ConnId -> IO (Either StoreError (NtfSubscription, NtfSubSMPAction, NtfActionTs)) + (smpHost, smpPort, ntfBatchSize) + getNtfSubAction :: ConnId -> IO (Either StoreError (NtfSubSMPAction, NtfSubscription)) getNtfSubAction connId = do markUpdatedByWorker db connId firstRow ntfSubAction err $ @@ -1697,7 +1697,7 @@ getNextNtfSubSMPAction db smpServer@(SMPServer smpHost smpPort _) = db [sql| SELECT c.user_id, s.ntf_host, s.ntf_port, s.ntf_key_hash, - ns.smp_ntf_id, ns.ntf_sub_id, ns.ntf_sub_status, ns.ntf_sub_action_ts, ns.ntf_sub_smp_action + ns.smp_ntf_id, ns.ntf_sub_id, ns.ntf_sub_status, ns.ntf_sub_smp_action FROM ntf_subscriptions ns JOIN connections c USING (conn_id) JOIN ntf_servers s USING (ntf_host, ntf_port) @@ -1706,10 +1706,10 @@ getNextNtfSubSMPAction db smpServer@(SMPServer smpHost smpPort _) = (Only connId) where err = SEInternal $ "ntf subscription " <> bshow connId <> " returned []" - ntfSubAction (userId, ntfHost, ntfPort, ntfKeyHash, ntfQueueId, ntfSubId, ntfSubStatus, actionTs, action) = + ntfSubAction (userId, ntfHost, ntfPort, ntfKeyHash, ntfQueueId, ntfSubId, ntfSubStatus, action) = let ntfServer = NtfServer ntfHost ntfPort ntfKeyHash ntfSubscription = NtfSubscription {userId, connId, smpServer, ntfQueueId, ntfServer, ntfSubId, ntfSubStatus} - in (ntfSubscription, action, actionTs) + in (action, ntfSubscription) markNtfSubActionSMPFailed_ :: DB.Connection -> ConnId -> IO () markNtfSubActionSMPFailed_ db connId =