diff --git a/src/Simplex/Messaging/Agent.hs b/src/Simplex/Messaging/Agent.hs index 3dab2ab39..d1fefa3cf 100644 --- a/src/Simplex/Messaging/Agent.hs +++ b/src/Simplex/Messaging/Agent.hs @@ -2177,15 +2177,13 @@ processSMPTransmissions c@AgentClient {subQ} (tSess@(_, srv, _), _v, sessId, ts) Left e -> notify' connId (ERR e) Right () -> pure () processSubOk :: RcvQueue -> TVar [ConnId] -> AM () - processSubOk rq@RcvQueue {userId, connId} upConnIds = do + processSubOk rq@RcvQueue {connId} upConnIds = atomically . whenM (isPendingSub connId) $ do addSubscription c rq modifyTVar' upConnIds (connId :) - atomically $ incSMPServerStat c userId srv connSubscribed processSubErr :: RcvQueue -> SMPClientError -> AM () - processSubErr rq@RcvQueue {userId, connId} e = do + processSubErr rq@RcvQueue {connId} e = do atomically . whenM (isPendingSub connId) $ failSubscription c rq e - atomically $ incSMPServerStat c userId srv connSubErrs lift $ notifyErr connId e isPendingSub connId = (&&) <$> hasPendingSubscription c connId <*> activeClientSession c tSess sessId notify' :: forall e m. (AEntityI e, MonadIO m) => ConnId -> AEvent e -> m () diff --git a/src/Simplex/Messaging/Agent/Client.hs b/src/Simplex/Messaging/Agent/Client.hs index 0a25e4741..89dbd12dc 100644 --- a/src/Simplex/Messaging/Agent/Client.hs +++ b/src/Simplex/Messaging/Agent/Client.hs @@ -176,7 +176,7 @@ import Data.Bifunctor (bimap, first, second) import Data.ByteString.Base64 import Data.ByteString.Char8 (ByteString) import qualified Data.ByteString.Char8 as B -import Data.Either (partitionEithers) +import Data.Either (isRight, partitionEithers) import Data.Functor (($>)) import Data.List (deleteFirstsBy, foldl', partition, (\\)) import Data.List.NonEmpty (NonEmpty (..), (<|)) @@ -1405,6 +1405,9 @@ subscribeQueues c qs = do let (userId, srv, _) = transportSession' smp atomically $ incSMPServerStat' c userId srv connSubAttempts (length qs') rs <- sendBatch subscribeSMPQueues smp qs' + let (successes, errs) = partition (isRight . snd) (L.toList rs) + atomically $ incSMPServerStat' c userId srv connSubscribed (length successes) + atomically $ incSMPServerStat' c userId srv connSubErrs (length errs) active <- atomically $ ifM