mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-03-30 16:26:02 +00:00
agent: enable notifications for all connections (#1262)
This commit is contained in:
@@ -959,12 +959,12 @@ subscribeConnections' c connIds = do
|
||||
let (errs, cs) = M.mapEither id conns
|
||||
errs' = M.map (Left . storeError) errs
|
||||
(subRs, rcvQs) = M.mapEither rcvQueueOrResult cs
|
||||
mapM_ (mapM_ (\(cData, sqs) -> mapM_ (lift . resumeMsgDelivery c cData) sqs) . sndQueue) cs
|
||||
lift $ mapM_ (mapM_ (\(cData, sqs) -> mapM_ (resumeMsgDelivery c cData) sqs) . sndQueue) cs
|
||||
lift $ resumeConnCmds c $ M.keys cs
|
||||
rcvRs <- lift $ connResults . fst <$> subscribeQueues c (concat $ M.elems rcvQs)
|
||||
ns <- asks ntfSupervisor
|
||||
tkn <- readTVarIO (ntfTkn ns)
|
||||
when (instantNotifications tkn) . void . lift . forkIO . void . runExceptT $ sendNtfCreate ns rcvRs conns
|
||||
lift $ when (instantNotifications tkn) . void . forkIO . void $ sendNtfCreate ns cs
|
||||
let rs = M.unions ([errs', subRs, rcvRs] :: [Map ConnId (Either AgentErrorType ())])
|
||||
notifyResultError rs
|
||||
pure rs
|
||||
@@ -996,15 +996,15 @@ subscribeConnections' c connIds = do
|
||||
order (Active, _) = 2
|
||||
order (_, Right _) = 3
|
||||
order _ = 4
|
||||
sendNtfCreate :: NtfSupervisor -> Map ConnId (Either AgentErrorType ()) -> Map ConnId (Either StoreError SomeConn) -> AM ()
|
||||
sendNtfCreate ns rcvRs conns =
|
||||
forM_ (M.assocs rcvRs) $ \case
|
||||
(connId, Right _) -> forM_ (M.lookup connId conns) $ \case
|
||||
Right (SomeConn _ conn) -> do
|
||||
let cmd = if enableNtfs $ toConnData conn then NSCCreate else NSCDelete
|
||||
atomically $ writeTBQueue (ntfSubQ ns) (connId, cmd)
|
||||
_ -> pure ()
|
||||
_ -> pure ()
|
||||
sendNtfCreate :: NtfSupervisor -> Map ConnId SomeConn -> AM' ()
|
||||
sendNtfCreate ns cs =
|
||||
-- TODO this needs to be batched end to end.
|
||||
-- Currently, the only change is to ignore failed subscriptions.
|
||||
forM_ cs $ \case
|
||||
SomeConn _ conn -> do
|
||||
let cmd = if enableNtfs $ toConnData conn then NSCCreate else NSCDelete
|
||||
ConnData {connId} = toConnData conn
|
||||
atomically $ writeTBQueue (ntfSubQ ns) (connId, cmd)
|
||||
sndQueue :: SomeConn -> Maybe (ConnData, NonEmpty SndQueue)
|
||||
sndQueue (SomeConn _ conn) = case conn of
|
||||
DuplexConnection cData _ sqs -> Just (cData, sqs)
|
||||
|
||||
Reference in New Issue
Block a user