mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-05-26 18:26:17 +00:00
change sub counting
This commit is contained in:
@@ -2177,15 +2177,13 @@ processSMPTransmissions c@AgentClient {subQ} (tSess@(_, srv, _), _v, sessId, ts)
|
||||
Left e -> notify' connId (ERR e)
|
||||
Right () -> pure ()
|
||||
processSubOk :: RcvQueue -> TVar [ConnId] -> AM ()
|
||||
processSubOk rq@RcvQueue {userId, connId} upConnIds = do
|
||||
processSubOk rq@RcvQueue {connId} upConnIds =
|
||||
atomically . whenM (isPendingSub connId) $ do
|
||||
addSubscription c rq
|
||||
modifyTVar' upConnIds (connId :)
|
||||
atomically $ incSMPServerStat c userId srv connSubscribed
|
||||
processSubErr :: RcvQueue -> SMPClientError -> AM ()
|
||||
processSubErr rq@RcvQueue {userId, connId} e = do
|
||||
processSubErr rq@RcvQueue {connId} e = do
|
||||
atomically . whenM (isPendingSub connId) $ failSubscription c rq e
|
||||
atomically $ incSMPServerStat c userId srv connSubErrs
|
||||
lift $ notifyErr connId e
|
||||
isPendingSub connId = (&&) <$> hasPendingSubscription c connId <*> activeClientSession c tSess sessId
|
||||
notify' :: forall e m. (AEntityI e, MonadIO m) => ConnId -> AEvent e -> m ()
|
||||
|
||||
@@ -176,7 +176,7 @@ import Data.Bifunctor (bimap, first, second)
|
||||
import Data.ByteString.Base64
|
||||
import Data.ByteString.Char8 (ByteString)
|
||||
import qualified Data.ByteString.Char8 as B
|
||||
import Data.Either (partitionEithers)
|
||||
import Data.Either (isRight, partitionEithers)
|
||||
import Data.Functor (($>))
|
||||
import Data.List (deleteFirstsBy, foldl', partition, (\\))
|
||||
import Data.List.NonEmpty (NonEmpty (..), (<|))
|
||||
@@ -1405,6 +1405,9 @@ subscribeQueues c qs = do
|
||||
let (userId, srv, _) = transportSession' smp
|
||||
atomically $ incSMPServerStat' c userId srv connSubAttempts (length qs')
|
||||
rs <- sendBatch subscribeSMPQueues smp qs'
|
||||
let (successes, errs) = partition (isRight . snd) (L.toList rs)
|
||||
atomically $ incSMPServerStat' c userId srv connSubscribed (length successes)
|
||||
atomically $ incSMPServerStat' c userId srv connSubErrs (length errs)
|
||||
active <-
|
||||
atomically $
|
||||
ifM
|
||||
|
||||
Reference in New Issue
Block a user