diff --git a/src/Simplex/Messaging/Agent.hs b/src/Simplex/Messaging/Agent.hs index 7417b823a..61f9d5da1 100644 --- a/src/Simplex/Messaging/Agent.hs +++ b/src/Simplex/Messaging/Agent.hs @@ -2067,12 +2067,10 @@ sendNtfConnCommands c cmd = do ns <- asks ntfSupervisor connIds <- liftIO $ S.toList <$> getSubscriptions c rs <- lift $ withStoreBatch' c (\db -> map (getConnData db) connIds) - let (connIds', errs) = enabledNtfConns (zip connIds rs) + let (connIds', cErrs) = enabledNtfConns (zip connIds rs) forM_ (L.nonEmpty connIds') $ \connIds'' -> atomically $ writeTBQueue (ntfSubQ ns) (cmd, connIds'') - -- TODO [batch ntf] notify ERRS - forM_ errs $ \(connId, e) -> - atomically $ writeTBQueue (subQ c) ("", connId, AEvt SAEConn $ ERR e) + unless (null cErrs) $ atomically $ writeTBQueue (subQ c) ("", "", AEvt SAENone $ ERRS cErrs) where enabledNtfConns :: [(ConnId, Either AgentErrorType (Maybe (ConnData, ConnectionMode)))] -> ([ConnId], [(ConnId, AgentErrorType)]) enabledNtfConns = foldr addEnabledConn ([], []) diff --git a/src/Simplex/Messaging/Agent/NtfSubSupervisor.hs b/src/Simplex/Messaging/Agent/NtfSubSupervisor.hs index c45a3184a..ad1acae5a 100644 --- a/src/Simplex/Messaging/Agent/NtfSubSupervisor.hs +++ b/src/Simplex/Messaging/Agent/NtfSubSupervisor.hs @@ -5,6 +5,7 @@ {-# LANGUAGE OverloadedLists #-} {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE ScopedTypeVariables #-} +{-# LANGUAGE TupleSections #-} {-# OPTIONS_GHC -fno-warn-ambiguous-fields #-} module Simplex.Messaging.Agent.NtfSubSupervisor @@ -23,7 +24,7 @@ import Control.Monad import Control.Monad.Reader import Control.Monad.Trans.Except import Data.Bifunctor (first) -import Data.Either (rights) +import Data.Either (partitionEithers) import Data.Foldable (foldr') import Data.List.NonEmpty (NonEmpty (..)) import qualified Data.List.NonEmpty as L @@ -66,13 +67,18 @@ runNtfSupervisor c = do notifyErr e notifyErr e = notifyInternalError' c $ "runNtfSupervisor error " <> show e --- TODO [batch ntf] notify ERRS +partitionErrs :: (a -> ConnId) -> [a] -> [Either AgentErrorType b] -> ([(ConnId, AgentErrorType)], [b]) +partitionErrs f xs = partitionEithers . zipWith (\x -> first (f x,)) xs +{-# INLINE partitionErrs #-} + processNtfCmd :: AgentClient -> (NtfSupervisorCommand, NonEmpty ConnId) -> AM () processNtfCmd c (cmd, connIds) = do logInfo $ "processNtfCmd - cmd = " <> tshow cmd + let connIds' = L.toList connIds case cmd of NSCCreate -> do - rqSubActions <- lift $ rights <$> withStoreBatch c (\db -> map (getQueueSub db) (L.toList connIds)) + (cErrs, rqSubActions) <- lift $ partitionErrs id connIds' <$> (withStoreBatch c $ \db -> map (getQueueSub db) connIds') + notifyErrs c cErrs logInfo $ "processNtfCmd, NSCCreate - length rqSubs = " <> tshow (length rqSubActions) let (ns, rs, css, cns) = partitionQueueSubActions rqSubActions createNewSubs ns @@ -93,7 +99,8 @@ processNtfCmd c (cmd, connIds) = do createNewSubs rqs = do withTokenServer $ \ntfServer -> do let newSubs = map (rqToNewSub ntfServer) rqs - void $ lift $ withStoreBatch c (\db -> map (storeNewSub db) newSubs) + (cErrs, _) <- lift $ partitionErrs ntfSubConnId newSubs <$> (withStoreBatch c $ \db -> map (storeNewSub db) newSubs) + notifyErrs c cErrs kickSMPWorkers rqs where rqToNewSub :: NtfServer -> RcvQueue -> NtfSubscription @@ -104,7 +111,8 @@ processNtfCmd c (cmd, connIds) = do resetSubs rqSubs = do withTokenServer $ \ntfServer -> do let subsToReset = map (toResetSub ntfServer) rqSubs - lift $ void $ withStoreBatch' c (\db -> map (\sub -> supervisorUpdateNtfSub db sub (NSASMP NSASmpKey)) subsToReset) + (cErrs, _) <- lift $ partitionErrs ntfSubConnId subsToReset <$> (withStoreBatch' c $ \db -> map (storeResetSub db) subsToReset) + notifyErrs c cErrs let rqs = map fst rqSubs kickSMPWorkers rqs where @@ -112,6 +120,9 @@ processNtfCmd c (cmd, connIds) = do toResetSub ntfServer (rq, sub) = let RcvQueue {server = smpServer} = rq in sub {smpServer, ntfQueueId = Nothing, ntfServer, ntfSubId = Nothing, ntfSubStatus = NASNew} + storeResetSub :: DB.Connection -> NtfSubscription -> IO () + storeResetSub db sub = supervisorUpdateNtfSub db sub (NSASMP NSASmpKey) + ntfSubConnId NtfSubscription {connId} = connId partitionQueueSubActions :: [(RcvQueue, Maybe NtfSupervisorSub)] -> ( [RcvQueue], -- new subs @@ -146,13 +157,19 @@ processNtfCmd c (cmd, connIds) = do NSANtf _ -> (ns, rs, css, subNtfServer : cns) reset = (ns, (rq, sub) : rs, css, cns) NSCSmpDelete -> do - rqs <- lift $ rights <$> withStoreBatch c (\db -> map (fmap (first storeError) . getPrimaryRcvQueue db) (L.toList connIds)) + (cErrs, rqs) <- lift $ partitionErrs id connIds' <$> (withStoreBatch c $ \db -> map (getQueue db) connIds') logInfo $ "processNtfCmd, NSCSmpDelete - length rqs = " <> tshow (length rqs) - lift $ void $ withStoreBatch' c (\db -> map (\rq -> supervisorUpdateNtfAction db (qConnId rq) (NSASMP NSASmpDelete)) rqs) + (cErrs', _) <- lift $ partitionErrs qConnId rqs <$> (withStoreBatch' c $ \db -> map (updateAction db) rqs) + notifyErrs c (cErrs <> cErrs') kickSMPWorkers rqs + where + getQueue :: DB.Connection -> ConnId -> IO (Either AgentErrorType RcvQueue) + getQueue db connId = first storeError <$> getPrimaryRcvQueue db connId + updateAction :: DB.Connection -> RcvQueue -> IO () + updateAction db rq = supervisorUpdateNtfAction db (qConnId rq) (NSASMP NSASmpDelete) NSCNtfWorker ntfServer -> lift . void $ getNtfNTFWorker True c ntfServer NSCNtfSMPWorker smpServer -> lift . void $ getNtfSMPWorker True c smpServer - NSCDeleteSub -> void $ lift $ withStoreBatch' c $ \db -> map (deleteNtfSubscription' db) (L.toList connIds) + NSCDeleteSub -> void $ lift $ withStoreBatch' c $ \db -> map (deleteNtfSubscription' db) connIds' where kickSMPWorkers :: [RcvQueue] -> AM () kickSMPWorkers rqs = do @@ -343,6 +360,10 @@ notifyInternalError' :: MonadIO m => AgentClient -> String -> m () notifyInternalError' AgentClient {subQ} internalErrStr = atomically $ writeTBQueue subQ ("", "", AEvt SAEConn $ ERR $ INTERNAL internalErrStr) {-# INLINE notifyInternalError' #-} +notifyErrs :: MonadIO m => AgentClient -> [(ConnId, AgentErrorType)] -> m () +notifyErrs AgentClient {subQ} connErrs = unless (null connErrs) $ atomically $ writeTBQueue subQ ("", "", AEvt SAENone $ ERRS connErrs) +{-# INLINE notifyErrs #-} + getNtfToken :: AM' (Maybe NtfToken) getNtfToken = do tkn <- asks $ ntfTkn . ntfSupervisor diff --git a/src/Simplex/Messaging/Agent/Protocol.hs b/src/Simplex/Messaging/Agent/Protocol.hs index bfa6e46b0..635bb1aee 100644 --- a/src/Simplex/Messaging/Agent/Protocol.hs +++ b/src/Simplex/Messaging/Agent/Protocol.hs @@ -366,6 +366,7 @@ data AEvent (e :: AEntity) where OK :: AEvent AEConn JOINED :: SndQueueSecured -> AEvent AEConn ERR :: AgentErrorType -> AEvent AEConn + ERRS :: [(ConnId, AgentErrorType)] -> AEvent AENone SUSPENDED :: AEvent AENone RFPROG :: Int64 -> Int64 -> AEvent AERcvFile RFDONE :: FilePath -> AEvent AERcvFile @@ -436,6 +437,7 @@ data AEventTag (e :: AEntity) where OK_ :: AEventTag AEConn JOINED_ :: AEventTag AEConn ERR_ :: AEventTag AEConn + ERRS_ :: AEventTag AENone SUSPENDED_ :: AEventTag AENone -- XFTP commands and responses RFDONE_ :: AEventTag AERcvFile @@ -490,6 +492,7 @@ aEventTag = \case OK -> OK_ JOINED _ -> JOINED_ ERR _ -> ERR_ + ERRS _ -> ERRS_ SUSPENDED -> SUSPENDED_ RFPROG {} -> RFPROG_ RFDONE {} -> RFDONE_