mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-06-04 23:51:33 +00:00
committed by
GitHub
parent
deb3fc7359
commit
9d12d76078
@@ -18,7 +18,7 @@ import Control.Monad
|
||||
import Control.Monad.Except
|
||||
import Control.Monad.IO.Unlift
|
||||
import Control.Monad.Trans.Except
|
||||
import Data.Bifunctor (first, bimap)
|
||||
import Data.Bifunctor (bimap, first)
|
||||
import Data.ByteString.Char8 (ByteString)
|
||||
import qualified Data.ByteString.Char8 as B
|
||||
import Data.Either (partitionEithers)
|
||||
@@ -36,11 +36,11 @@ import Simplex.Messaging.Agent.RetryInterval
|
||||
import Simplex.Messaging.Client
|
||||
import qualified Simplex.Messaging.Crypto as C
|
||||
import Simplex.Messaging.Encoding.String
|
||||
import Simplex.Messaging.Protocol (BrokerMsg, ProtocolServer (..), QueueId, SMPServer, NtfPrivateSignKey, NotifierId, RcvPrivateSignKey, RecipientId)
|
||||
import Simplex.Messaging.Protocol (BrokerMsg, NotifierId, NtfPrivateSignKey, ProtocolServer (..), QueueId, RcvPrivateSignKey, RecipientId, SMPServer)
|
||||
import Simplex.Messaging.TMap (TMap)
|
||||
import qualified Simplex.Messaging.TMap as TM
|
||||
import Simplex.Messaging.Transport
|
||||
import Simplex.Messaging.Util (catchAll_, ($>>=), toChunks)
|
||||
import Simplex.Messaging.Util (catchAll_, toChunks, ($>>=))
|
||||
import System.Timeout (timeout)
|
||||
import UnliftIO (async)
|
||||
import UnliftIO.Exception (Exception)
|
||||
@@ -238,7 +238,7 @@ getSMPServerClient' ca@SMPClientAgent {agentCfg, smpClients, msgQ} srv =
|
||||
(tempErrs, finalErrs) = partition (temporaryClientError . snd) errs
|
||||
mapM_ (atomically . addSubscription ca srv) oks
|
||||
mapM_ (liftIO . notify . CAResubscribed srv) $ L.nonEmpty $ map fst oks
|
||||
mapM_ (atomically . removePendingSubscription ca srv . fst) finalErrs
|
||||
mapM_ (atomically . removePendingSubscription ca srv . fst) finalErrs
|
||||
mapM_ (liftIO . notify . CASubError srv) $ L.nonEmpty finalErrs
|
||||
mapM_ (throwE . snd) $ listToMaybe tempErrs
|
||||
|
||||
@@ -281,7 +281,7 @@ subscribeQueue ca srv sub = do
|
||||
|
||||
handleErr e = do
|
||||
atomically . when (e /= PCENetworkError && e /= PCEResponseTimeout) $
|
||||
removePendingSubscription ca srv $ fst sub
|
||||
removePendingSubscription ca srv (fst sub)
|
||||
throwE e
|
||||
|
||||
subscribeQueuesSMP :: SMPClientAgent -> SMPServer -> NonEmpty (RecipientId, RcvPrivateSignKey) -> IO (NonEmpty (RecipientId, Either SMPClientError ()))
|
||||
@@ -300,14 +300,15 @@ subscribeQueues_ party ca srv subs = do
|
||||
smpSubscribeQueues :: SMPSubParty -> SMPClientAgent -> SMPClient -> SMPServer -> NonEmpty (QueueId, C.APrivateSignKey) -> IO (NonEmpty (QueueId, Either SMPClientError ()))
|
||||
smpSubscribeQueues party ca smp srv subs = do
|
||||
rs <- L.zip subs <$> subscribe smp (L.map swap subs)
|
||||
atomically $ forM rs $ \(sub, r) -> (fst sub,) <$> case r of
|
||||
Right () -> do
|
||||
addSubscription ca srv $ first (party,) sub
|
||||
pure $ Right ()
|
||||
Left e -> do
|
||||
when (e /= PCENetworkError && e /= PCEResponseTimeout) $
|
||||
removePendingSubscription ca srv $ (party,) $ fst sub
|
||||
pure $ Left e
|
||||
atomically $ forM rs $ \(sub, r) ->
|
||||
(fst sub,) <$> case r of
|
||||
Right () -> do
|
||||
addSubscription ca srv $ first (party,) sub
|
||||
pure $ Right ()
|
||||
Left e -> do
|
||||
when (e /= PCENetworkError && e /= PCEResponseTimeout) $
|
||||
removePendingSubscription ca srv (party, fst sub)
|
||||
pure $ Left e
|
||||
where
|
||||
subscribe = case party of
|
||||
SPRecipient -> subscribeSMPQueues
|
||||
|
||||
Reference in New Issue
Block a user