mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-03-30 14:16:00 +00:00
agent: ntf errs (#1316)
* agent: ntf errs * refactor * errs * refactor * simplify * inline --------- Co-authored-by: Evgeny Poberezkin <evgeny@poberezkin.com>
This commit is contained in:
@@ -2067,12 +2067,10 @@ sendNtfConnCommands c cmd = do
|
||||
ns <- asks ntfSupervisor
|
||||
connIds <- liftIO $ S.toList <$> getSubscriptions c
|
||||
rs <- lift $ withStoreBatch' c (\db -> map (getConnData db) connIds)
|
||||
let (connIds', errs) = enabledNtfConns (zip connIds rs)
|
||||
let (connIds', cErrs) = enabledNtfConns (zip connIds rs)
|
||||
forM_ (L.nonEmpty connIds') $ \connIds'' ->
|
||||
atomically $ writeTBQueue (ntfSubQ ns) (cmd, connIds'')
|
||||
-- TODO [batch ntf] notify ERRS
|
||||
forM_ errs $ \(connId, e) ->
|
||||
atomically $ writeTBQueue (subQ c) ("", connId, AEvt SAEConn $ ERR e)
|
||||
unless (null cErrs) $ atomically $ writeTBQueue (subQ c) ("", "", AEvt SAENone $ ERRS cErrs)
|
||||
where
|
||||
enabledNtfConns :: [(ConnId, Either AgentErrorType (Maybe (ConnData, ConnectionMode)))] -> ([ConnId], [(ConnId, AgentErrorType)])
|
||||
enabledNtfConns = foldr addEnabledConn ([], [])
|
||||
|
||||
@@ -5,6 +5,7 @@
|
||||
{-# LANGUAGE OverloadedLists #-}
|
||||
{-# LANGUAGE OverloadedStrings #-}
|
||||
{-# LANGUAGE ScopedTypeVariables #-}
|
||||
{-# LANGUAGE TupleSections #-}
|
||||
{-# OPTIONS_GHC -fno-warn-ambiguous-fields #-}
|
||||
|
||||
module Simplex.Messaging.Agent.NtfSubSupervisor
|
||||
@@ -23,7 +24,7 @@ import Control.Monad
|
||||
import Control.Monad.Reader
|
||||
import Control.Monad.Trans.Except
|
||||
import Data.Bifunctor (first)
|
||||
import Data.Either (rights)
|
||||
import Data.Either (partitionEithers)
|
||||
import Data.Foldable (foldr')
|
||||
import Data.List.NonEmpty (NonEmpty (..))
|
||||
import qualified Data.List.NonEmpty as L
|
||||
@@ -66,13 +67,18 @@ runNtfSupervisor c = do
|
||||
notifyErr e
|
||||
notifyErr e = notifyInternalError' c $ "runNtfSupervisor error " <> show e
|
||||
|
||||
-- TODO [batch ntf] notify ERRS
|
||||
partitionErrs :: (a -> ConnId) -> [a] -> [Either AgentErrorType b] -> ([(ConnId, AgentErrorType)], [b])
|
||||
partitionErrs f xs = partitionEithers . zipWith (\x -> first (f x,)) xs
|
||||
{-# INLINE partitionErrs #-}
|
||||
|
||||
processNtfCmd :: AgentClient -> (NtfSupervisorCommand, NonEmpty ConnId) -> AM ()
|
||||
processNtfCmd c (cmd, connIds) = do
|
||||
logInfo $ "processNtfCmd - cmd = " <> tshow cmd
|
||||
let connIds' = L.toList connIds
|
||||
case cmd of
|
||||
NSCCreate -> do
|
||||
rqSubActions <- lift $ rights <$> withStoreBatch c (\db -> map (getQueueSub db) (L.toList connIds))
|
||||
(cErrs, rqSubActions) <- lift $ partitionErrs id connIds' <$> (withStoreBatch c $ \db -> map (getQueueSub db) connIds')
|
||||
notifyErrs c cErrs
|
||||
logInfo $ "processNtfCmd, NSCCreate - length rqSubs = " <> tshow (length rqSubActions)
|
||||
let (ns, rs, css, cns) = partitionQueueSubActions rqSubActions
|
||||
createNewSubs ns
|
||||
@@ -93,7 +99,8 @@ processNtfCmd c (cmd, connIds) = do
|
||||
createNewSubs rqs = do
|
||||
withTokenServer $ \ntfServer -> do
|
||||
let newSubs = map (rqToNewSub ntfServer) rqs
|
||||
void $ lift $ withStoreBatch c (\db -> map (storeNewSub db) newSubs)
|
||||
(cErrs, _) <- lift $ partitionErrs ntfSubConnId newSubs <$> (withStoreBatch c $ \db -> map (storeNewSub db) newSubs)
|
||||
notifyErrs c cErrs
|
||||
kickSMPWorkers rqs
|
||||
where
|
||||
rqToNewSub :: NtfServer -> RcvQueue -> NtfSubscription
|
||||
@@ -104,7 +111,8 @@ processNtfCmd c (cmd, connIds) = do
|
||||
resetSubs rqSubs = do
|
||||
withTokenServer $ \ntfServer -> do
|
||||
let subsToReset = map (toResetSub ntfServer) rqSubs
|
||||
lift $ void $ withStoreBatch' c (\db -> map (\sub -> supervisorUpdateNtfSub db sub (NSASMP NSASmpKey)) subsToReset)
|
||||
(cErrs, _) <- lift $ partitionErrs ntfSubConnId subsToReset <$> (withStoreBatch' c $ \db -> map (storeResetSub db) subsToReset)
|
||||
notifyErrs c cErrs
|
||||
let rqs = map fst rqSubs
|
||||
kickSMPWorkers rqs
|
||||
where
|
||||
@@ -112,6 +120,9 @@ processNtfCmd c (cmd, connIds) = do
|
||||
toResetSub ntfServer (rq, sub) =
|
||||
let RcvQueue {server = smpServer} = rq
|
||||
in sub {smpServer, ntfQueueId = Nothing, ntfServer, ntfSubId = Nothing, ntfSubStatus = NASNew}
|
||||
storeResetSub :: DB.Connection -> NtfSubscription -> IO ()
|
||||
storeResetSub db sub = supervisorUpdateNtfSub db sub (NSASMP NSASmpKey)
|
||||
ntfSubConnId NtfSubscription {connId} = connId
|
||||
partitionQueueSubActions ::
|
||||
[(RcvQueue, Maybe NtfSupervisorSub)] ->
|
||||
( [RcvQueue], -- new subs
|
||||
@@ -146,13 +157,19 @@ processNtfCmd c (cmd, connIds) = do
|
||||
NSANtf _ -> (ns, rs, css, subNtfServer : cns)
|
||||
reset = (ns, (rq, sub) : rs, css, cns)
|
||||
NSCSmpDelete -> do
|
||||
rqs <- lift $ rights <$> withStoreBatch c (\db -> map (fmap (first storeError) . getPrimaryRcvQueue db) (L.toList connIds))
|
||||
(cErrs, rqs) <- lift $ partitionErrs id connIds' <$> (withStoreBatch c $ \db -> map (getQueue db) connIds')
|
||||
logInfo $ "processNtfCmd, NSCSmpDelete - length rqs = " <> tshow (length rqs)
|
||||
lift $ void $ withStoreBatch' c (\db -> map (\rq -> supervisorUpdateNtfAction db (qConnId rq) (NSASMP NSASmpDelete)) rqs)
|
||||
(cErrs', _) <- lift $ partitionErrs qConnId rqs <$> (withStoreBatch' c $ \db -> map (updateAction db) rqs)
|
||||
notifyErrs c (cErrs <> cErrs')
|
||||
kickSMPWorkers rqs
|
||||
where
|
||||
getQueue :: DB.Connection -> ConnId -> IO (Either AgentErrorType RcvQueue)
|
||||
getQueue db connId = first storeError <$> getPrimaryRcvQueue db connId
|
||||
updateAction :: DB.Connection -> RcvQueue -> IO ()
|
||||
updateAction db rq = supervisorUpdateNtfAction db (qConnId rq) (NSASMP NSASmpDelete)
|
||||
NSCNtfWorker ntfServer -> lift . void $ getNtfNTFWorker True c ntfServer
|
||||
NSCNtfSMPWorker smpServer -> lift . void $ getNtfSMPWorker True c smpServer
|
||||
NSCDeleteSub -> void $ lift $ withStoreBatch' c $ \db -> map (deleteNtfSubscription' db) (L.toList connIds)
|
||||
NSCDeleteSub -> void $ lift $ withStoreBatch' c $ \db -> map (deleteNtfSubscription' db) connIds'
|
||||
where
|
||||
kickSMPWorkers :: [RcvQueue] -> AM ()
|
||||
kickSMPWorkers rqs = do
|
||||
@@ -343,6 +360,10 @@ notifyInternalError' :: MonadIO m => AgentClient -> String -> m ()
|
||||
notifyInternalError' AgentClient {subQ} internalErrStr = atomically $ writeTBQueue subQ ("", "", AEvt SAEConn $ ERR $ INTERNAL internalErrStr)
|
||||
{-# INLINE notifyInternalError' #-}
|
||||
|
||||
notifyErrs :: MonadIO m => AgentClient -> [(ConnId, AgentErrorType)] -> m ()
|
||||
notifyErrs AgentClient {subQ} connErrs = unless (null connErrs) $ atomically $ writeTBQueue subQ ("", "", AEvt SAENone $ ERRS connErrs)
|
||||
{-# INLINE notifyErrs #-}
|
||||
|
||||
getNtfToken :: AM' (Maybe NtfToken)
|
||||
getNtfToken = do
|
||||
tkn <- asks $ ntfTkn . ntfSupervisor
|
||||
|
||||
@@ -366,6 +366,7 @@ data AEvent (e :: AEntity) where
|
||||
OK :: AEvent AEConn
|
||||
JOINED :: SndQueueSecured -> AEvent AEConn
|
||||
ERR :: AgentErrorType -> AEvent AEConn
|
||||
ERRS :: [(ConnId, AgentErrorType)] -> AEvent AENone
|
||||
SUSPENDED :: AEvent AENone
|
||||
RFPROG :: Int64 -> Int64 -> AEvent AERcvFile
|
||||
RFDONE :: FilePath -> AEvent AERcvFile
|
||||
@@ -436,6 +437,7 @@ data AEventTag (e :: AEntity) where
|
||||
OK_ :: AEventTag AEConn
|
||||
JOINED_ :: AEventTag AEConn
|
||||
ERR_ :: AEventTag AEConn
|
||||
ERRS_ :: AEventTag AENone
|
||||
SUSPENDED_ :: AEventTag AENone
|
||||
-- XFTP commands and responses
|
||||
RFDONE_ :: AEventTag AERcvFile
|
||||
@@ -490,6 +492,7 @@ aEventTag = \case
|
||||
OK -> OK_
|
||||
JOINED _ -> JOINED_
|
||||
ERR _ -> ERR_
|
||||
ERRS _ -> ERRS_
|
||||
SUSPENDED -> SUSPENDED_
|
||||
RFPROG {} -> RFPROG_
|
||||
RFDONE {} -> RFDONE_
|
||||
|
||||
Reference in New Issue
Block a user