Files
simplexmq/src/Simplex/Messaging/Client/Agent.hs
Evgeny cb3250e7b4 servers: better socket leak prevention during TLS handshake, add NetworkError type to better diagnose connection errors (#1619)
* servers: better socket leak prevention during TLS handshake

* log tcp connection errors

* more detailed network error

* log full address

* rename error

* add encodings for NetworkError

* refactor

* comment

* bind

* style

* remove parameters of NETWORK error from encoding
2025-09-02 16:07:37 +01:00

581 lines
26 KiB
Haskell

{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE DuplicateRecordFields #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TupleSections #-}
{-# LANGUAGE TypeApplications #-}
module Simplex.Messaging.Client.Agent
( SMPClientAgent (..),
SMPClientAgentConfig (..),
SMPClientAgentEvent (..),
OwnServer,
defaultSMPClientAgentConfig,
newSMPClientAgent,
getSMPServerClient'',
getConnectedSMPServerClient,
closeSMPClientAgent,
lookupSMPServerClient,
isOwnServer,
subscribeServiceNtfs,
subscribeQueuesNtfs,
activeClientSession',
removeActiveSub,
removeActiveSubs,
removePendingSub,
removePendingSubs,
)
where
import Control.Concurrent (forkIO)
import Control.Concurrent.Async (Async, uninterruptibleCancel)
import Control.Concurrent.STM (retry)
import Control.Logger.Simple
import Control.Monad
import Control.Monad.Except
import Control.Monad.IO.Unlift
import Control.Monad.Trans.Except
import Crypto.Random (ChaChaDRG)
import Data.ByteString.Char8 (ByteString)
import qualified Data.ByteString.Char8 as B
import Data.Constraint (Dict (..))
import Data.Int (Int64)
import Data.List.NonEmpty (NonEmpty)
import qualified Data.List.NonEmpty as L
import Data.Map.Strict (Map)
import qualified Data.Map.Strict as M
import Data.Maybe (isJust, isNothing)
import qualified Data.Set as S
import Data.Text.Encoding
import Data.Time.Clock (NominalDiffTime, UTCTime, addUTCTime, getCurrentTime)
import Numeric.Natural
import Simplex.Messaging.Agent.RetryInterval
import Simplex.Messaging.Client
import qualified Simplex.Messaging.Crypto as C
import Simplex.Messaging.Encoding.String
import Simplex.Messaging.Protocol
( BrokerMsg,
ErrorType,
NotifierId,
NtfPrivateAuthKey,
Party (..),
PartyI,
ProtocolServer (..),
QueueId,
SMPServer,
SParty (..),
ServiceParty,
serviceParty,
partyServiceRole
)
import Simplex.Messaging.Session
import Simplex.Messaging.TMap (TMap)
import qualified Simplex.Messaging.TMap as TM
import Simplex.Messaging.Transport
import Simplex.Messaging.Util (catchAll_, ifM, safeDecodeUtf8, toChunks, tshow, whenM, ($>>=), (<$$>))
import System.Timeout (timeout)
import UnliftIO (async)
import qualified UnliftIO.Exception as E
import UnliftIO.STM
type SMPClientVar = SessionVar (Either (SMPClientError, Maybe UTCTime) (OwnServer, SMPClient))
data SMPClientAgentEvent
= CAConnected SMPServer (Maybe ServiceId)
| CADisconnected SMPServer (NonEmpty QueueId)
| CASubscribed SMPServer (Maybe ServiceId) (NonEmpty QueueId)
| CASubError SMPServer (NonEmpty (QueueId, SMPClientError))
| CAServiceDisconnected SMPServer (ServiceId, Int64)
| CAServiceSubscribed SMPServer (ServiceId, Int64) Int64
| CAServiceSubError SMPServer (ServiceId, Int64) SMPClientError
-- CAServiceUnavailable is used when service ID in pending subscription is different from the current service in connection.
-- This will require resubscribing to all queues associated with this service ID individually, creating new associations.
-- It may happen if, for example, SMP server deletes service information (e.g. via downgrade and upgrade)
-- and assigns different service ID to the service certificate.
| CAServiceUnavailable SMPServer (ServiceId, Int64)
data SMPClientAgentConfig = SMPClientAgentConfig
{ smpCfg :: ProtocolClientConfig SMPVersion,
reconnectInterval :: RetryInterval,
persistErrorInterval :: NominalDiffTime,
msgQSize :: Natural,
agentQSize :: Natural,
agentSubsBatchSize :: Int,
ownServerDomains :: [ByteString]
}
defaultSMPClientAgentConfig :: SMPClientAgentConfig
defaultSMPClientAgentConfig =
SMPClientAgentConfig
{ smpCfg = defaultSMPClientConfig,
reconnectInterval =
RetryInterval
{ initialInterval = second,
increaseAfter = 10 * second,
maxInterval = 10 * second
},
persistErrorInterval = 30, -- seconds
msgQSize = 2048,
agentQSize = 2048,
agentSubsBatchSize = 1360,
ownServerDomains = []
}
where
second = 1000000
data SMPClientAgent p = SMPClientAgent
{ agentCfg :: SMPClientAgentConfig,
agentParty :: SParty p,
active :: TVar Bool,
startedAt :: UTCTime,
msgQ :: TBQueue (ServerTransmissionBatch SMPVersion ErrorType BrokerMsg),
agentQ :: TBQueue SMPClientAgentEvent,
randomDrg :: TVar ChaChaDRG,
smpClients :: TMap SMPServer SMPClientVar,
smpSessions :: TMap SessionId (OwnServer, SMPClient),
-- Only one service subscription can exist per server with this agent.
-- With correctly functioning SMP server, queue and service subscriptions can't be
-- active at the same time.
activeServiceSubs :: TMap SMPServer (TVar (Maybe ((ServiceId, Int64), SessionId))),
activeQueueSubs :: TMap SMPServer (TMap QueueId (SessionId, C.APrivateAuthKey)),
-- Pending service subscriptions can co-exist with pending queue subscriptions
-- on the same SMP server during subscriptions being transitioned from per-queue to service.
pendingServiceSubs :: TMap SMPServer (TVar (Maybe (ServiceId, Int64))),
pendingQueueSubs :: TMap SMPServer (TMap QueueId C.APrivateAuthKey),
smpSubWorkers :: TMap SMPServer (SessionVar (Async ())),
workerSeq :: TVar Int
}
type OwnServer = Bool
newSMPClientAgent :: SParty p -> SMPClientAgentConfig -> TVar ChaChaDRG -> IO (SMPClientAgent p)
newSMPClientAgent agentParty agentCfg@SMPClientAgentConfig {msgQSize, agentQSize} randomDrg = do
active <- newTVarIO True
startedAt <- getCurrentTime
msgQ <- newTBQueueIO msgQSize
agentQ <- newTBQueueIO agentQSize
smpClients <- TM.emptyIO
smpSessions <- TM.emptyIO
activeServiceSubs <- TM.emptyIO
activeQueueSubs <- TM.emptyIO
pendingServiceSubs <- TM.emptyIO
pendingQueueSubs <- TM.emptyIO
smpSubWorkers <- TM.emptyIO
workerSeq <- newTVarIO 0
pure
SMPClientAgent
{ agentCfg,
agentParty,
active,
startedAt,
msgQ,
agentQ,
randomDrg,
smpClients,
smpSessions,
activeServiceSubs,
activeQueueSubs,
pendingServiceSubs,
pendingQueueSubs,
smpSubWorkers,
workerSeq
}
-- | Get or create SMP client for SMPServer
getSMPServerClient' :: SMPClientAgent p -> SMPServer -> ExceptT SMPClientError IO SMPClient
getSMPServerClient' ca srv = snd <$> getSMPServerClient'' ca srv
{-# INLINE getSMPServerClient' #-}
getSMPServerClient'' :: SMPClientAgent p -> SMPServer -> ExceptT SMPClientError IO (OwnServer, SMPClient)
getSMPServerClient'' ca@SMPClientAgent {agentCfg, smpClients, smpSessions, workerSeq} srv = do
ts <- liftIO getCurrentTime
atomically (getClientVar ts) >>= either (ExceptT . newSMPClient) waitForSMPClient
where
getClientVar :: UTCTime -> STM (Either SMPClientVar SMPClientVar)
getClientVar = getSessVar workerSeq srv smpClients
waitForSMPClient :: SMPClientVar -> ExceptT SMPClientError IO (OwnServer, SMPClient)
waitForSMPClient v = do
let ProtocolClientConfig {networkConfig = NetworkConfig {tcpConnectTimeout}} = smpCfg agentCfg
smpClient_ <- liftIO $ netTimeoutInt tcpConnectTimeout NRMBackground `timeout` atomically (readTMVar $ sessionVar v)
case smpClient_ of
Just (Right smpClient) -> pure smpClient
Just (Left (e, ts_)) -> case ts_ of
Nothing -> throwE e
Just ts ->
ifM
((ts <) <$> liftIO getCurrentTime)
(atomically (removeSessVar v srv smpClients) >> getSMPServerClient'' ca srv)
(throwE e)
Nothing -> throwE PCEResponseTimeout
newSMPClient :: SMPClientVar -> IO (Either SMPClientError (OwnServer, SMPClient))
newSMPClient v = do
r <- connectClient ca srv v `E.catch` (pure . Left . PCEIOError)
case r of
Right smp -> do
logInfo . decodeUtf8 $ "Agent connected to " <> showServer srv
let !owned = isOwnServer ca srv
!c = (owned, smp)
atomically $ do
putTMVar (sessionVar v) (Right c)
TM.insert (sessionId $ thParams smp) c smpSessions
let serviceId_ = (\THClientService {serviceId} -> serviceId) <$> smpClientService smp
notify ca $ CAConnected srv serviceId_
pure $ Right c
Left e -> do
let ei = persistErrorInterval agentCfg
if ei == 0
then atomically $ do
putTMVar (sessionVar v) (Left (e, Nothing))
removeSessVar v srv smpClients
else do
ts <- addUTCTime ei <$> liftIO getCurrentTime
atomically $ putTMVar (sessionVar v) (Left (e, Just ts))
reconnectClient ca srv
pure $ Left e
isOwnServer :: SMPClientAgent p -> SMPServer -> OwnServer
isOwnServer SMPClientAgent {agentCfg} ProtocolServer {host} =
let srv = strEncode $ L.head host
in any (\s -> s == srv || B.cons '.' s `B.isSuffixOf` srv) (ownServerDomains agentCfg)
-- | Run an SMP client for SMPClientVar
connectClient :: SMPClientAgent p -> SMPServer -> SMPClientVar -> IO (Either SMPClientError SMPClient)
connectClient ca@SMPClientAgent {agentCfg, smpClients, smpSessions, msgQ, randomDrg, startedAt} srv v =
getProtocolClient randomDrg NRMBackground (1, srv, Nothing) (smpCfg agentCfg) [] (Just msgQ) startedAt clientDisconnected
where
clientDisconnected :: SMPClient -> IO ()
clientDisconnected smp = do
removeClientAndSubs smp >>= serverDown
logInfo . decodeUtf8 $ "Agent disconnected from " <> showServer srv
removeClientAndSubs :: SMPClient -> IO (Maybe (ServiceId, Int64), Maybe (Map QueueId C.APrivateAuthKey))
removeClientAndSubs smp = do
-- Looking up subscription vars outside of STM transaction to reduce re-evaluation.
-- It is possible because these vars are never removed, they are only added.
sVar_ <- TM.lookupIO srv $ activeServiceSubs ca
qVar_ <- TM.lookupIO srv $ activeQueueSubs ca
atomically $ do
TM.delete sessId smpSessions
removeSessVar v srv smpClients
sSub <- pure sVar_ $>>= updateServiceSub
qSubs <- pure qVar_ $>>= updateQueueSubs
pure (sSub, qSubs)
where
sessId = sessionId $ thParams smp
updateServiceSub sVar = do -- (sub, sessId')
-- We don't change active subscription in case session ID is different from disconnected client
serviceSub_ <- stateTVar sVar $ \case
Just (serviceSub, sessId') | sessId == sessId' -> (Just serviceSub, Nothing)
s -> (Nothing, s)
-- We don't reset pending subscription to Nothing here to avoid any race conditions
-- with subsequent client sessions that might have set pending already.
when (isJust serviceSub_) $ setPendingServiceSub ca srv serviceSub_
pure serviceSub_
updateQueueSubs qVar = do
-- removing subscriptions that have matching sessionId to disconnected client
-- and keep the other ones (they can be made by the new client)
subs <- M.map snd <$> stateTVar qVar (M.partition ((sessId ==) . fst))
if M.null subs
then pure Nothing
else Just subs <$ addSubs_ (pendingQueueSubs ca) srv subs
serverDown :: (Maybe (ServiceId, Int64), Maybe (Map QueueId C.APrivateAuthKey)) -> IO ()
serverDown (sSub, qSubs) = do
mapM_ (notify ca . CAServiceDisconnected srv) sSub
let qIds = L.nonEmpty . M.keys =<< qSubs
mapM_ (notify ca . CADisconnected srv) qIds
when (isJust sSub || isJust qIds) $ reconnectClient ca srv
-- | Spawn reconnect worker if needed
reconnectClient :: SMPClientAgent p -> SMPServer -> IO ()
reconnectClient ca@SMPClientAgent {active, agentCfg, smpSubWorkers, workerSeq} srv = do
ts <- getCurrentTime
whenM (readTVarIO active) $ atomically (getWorkerVar ts) >>= mapM_ (either newSubWorker (\_ -> pure ()))
where
getWorkerVar ts =
ifM
(noPending <$> getPending TM.lookup readTVar)
(pure Nothing) -- prevent race with cleanup and adding pending queues in another call
(Just <$> getSessVar workerSeq srv smpSubWorkers ts)
newSubWorker :: SessionVar (Async ()) -> IO ()
newSubWorker v = do
a <- async $ void (E.tryAny runSubWorker) >> atomically (cleanup v)
atomically $ putTMVar (sessionVar v) a
runSubWorker =
withRetryInterval (reconnectInterval agentCfg) $ \_ loop -> do
subs <- getPending TM.lookupIO readTVarIO
unless (noPending subs) $ whenM (readTVarIO active) $ do
void $ netTimeoutInt tcpConnectTimeout NRMBackground `timeout` runExceptT (reconnectSMPClient ca srv subs)
loop
ProtocolClientConfig {networkConfig = NetworkConfig {tcpConnectTimeout}} = smpCfg agentCfg
noPending (sSub, qSubs) = isNothing sSub && maybe True M.null qSubs
getPending :: Monad m => (forall a. SMPServer -> TMap SMPServer a -> m (Maybe a)) -> (forall a. TVar a -> m a) -> m (Maybe (ServiceId, Int64), Maybe (Map QueueId C.APrivateAuthKey))
getPending lkup rd = do
sSub <- lkup srv (pendingServiceSubs ca) $>>= rd
qSubs <- lkup srv (pendingQueueSubs ca) >>= mapM rd
pure (sSub, qSubs)
cleanup :: SessionVar (Async ()) -> STM ()
cleanup v = do
-- Here we wait until TMVar is not empty to prevent worker cleanup happening before worker is added to TMVar.
-- Not waiting may result in terminated worker remaining in the map.
whenM (isEmptyTMVar $ sessionVar v) retry
removeSessVar v srv smpSubWorkers
reconnectSMPClient :: forall p. SMPClientAgent p -> SMPServer -> (Maybe (ServiceId, Int64), Maybe (Map QueueId C.APrivateAuthKey)) -> ExceptT SMPClientError IO ()
reconnectSMPClient ca@SMPClientAgent {agentCfg, agentParty} srv (sSub_, qSubs_) =
withSMP ca srv $ \smp -> liftIO $ case serviceParty agentParty of
Just Dict -> resubscribe smp
Nothing -> pure ()
where
resubscribe :: (PartyI p, ServiceParty p) => SMPClient -> IO ()
resubscribe smp = do
mapM_ (smpSubscribeService ca smp srv) sSub_
forM_ qSubs_ $ \qSubs -> do
currSubs_ <- mapM readTVarIO =<< TM.lookupIO srv (activeQueueSubs ca)
let qSubs' :: [(QueueId, C.APrivateAuthKey)] =
maybe id (\currSubs -> filter ((`M.notMember` currSubs) . fst)) currSubs_ $ M.assocs qSubs
mapM_ (smpSubscribeQueues @p ca smp srv) $ toChunks (agentSubsBatchSize agentCfg) qSubs'
notify :: MonadIO m => SMPClientAgent p -> SMPClientAgentEvent -> m ()
notify ca evt = atomically $ writeTBQueue (agentQ ca) evt
{-# INLINE notify #-}
-- Returns already connected client for proxying messages or Nothing if client is absent, not connected yet or stores expired error.
-- If Nothing is return proxy will spawn a new thread to wait or to create another client connection to destination relay.
getConnectedSMPServerClient :: SMPClientAgent p -> SMPServer -> IO (Maybe (Either SMPClientError (OwnServer, SMPClient)))
getConnectedSMPServerClient SMPClientAgent {smpClients} srv =
atomically (TM.lookup srv smpClients $>>= \v -> (v,) <$$> tryReadTMVar (sessionVar v)) -- Nothing: client is absent or not connected yet
$>>= \case
(_, Right r) -> pure $ Just $ Right r
(v, Left (e, ts_)) ->
pure ts_ $>>= \ts ->
-- proxy will create a new connection if ts_ is Nothing
ifM
((ts <) <$> liftIO getCurrentTime) -- error persistence interval period expired?
(Nothing <$ atomically (removeSessVar v srv smpClients)) -- proxy will create a new connection
(pure $ Just $ Left e) -- not expired, returning error
lookupSMPServerClient :: SMPClientAgent p -> SessionId -> IO (Maybe (OwnServer, SMPClient))
lookupSMPServerClient SMPClientAgent {smpSessions} sessId = TM.lookupIO sessId smpSessions
closeSMPClientAgent :: SMPClientAgent p -> IO ()
closeSMPClientAgent c = do
atomically $ writeTVar (active c) False
closeSMPServerClients c
atomically (swapTVar (smpSubWorkers c) M.empty) >>= mapM_ cancelReconnect
where
cancelReconnect :: SessionVar (Async ()) -> IO ()
cancelReconnect v = void . forkIO $ atomically (readTMVar $ sessionVar v) >>= uninterruptibleCancel
closeSMPServerClients :: SMPClientAgent p -> IO ()
closeSMPServerClients c = atomically (smpClients c `swapTVar` M.empty) >>= mapM_ (forkIO . closeClient)
where
closeClient v =
atomically (readTMVar $ sessionVar v) >>= \case
Right (_, smp) -> closeProtocolClient smp `catchAll_` pure ()
_ -> pure ()
cancelActions :: Foldable f => TVar (f (Async ())) -> IO ()
cancelActions as = readTVarIO as >>= mapM_ uninterruptibleCancel
withSMP :: SMPClientAgent p -> SMPServer -> (SMPClient -> ExceptT SMPClientError IO a) -> ExceptT SMPClientError IO a
withSMP ca srv action = (getSMPServerClient' ca srv >>= action) `catchE` logSMPError
where
logSMPError :: SMPClientError -> ExceptT SMPClientError IO a
logSMPError e = do
logInfo $ "SMP error (" <> safeDecodeUtf8 (strEncode srv) <> "): " <> tshow e
throwE e
subscribeQueuesNtfs :: SMPClientAgent 'NotifierService -> SMPServer -> NonEmpty (NotifierId, NtfPrivateAuthKey) -> IO ()
subscribeQueuesNtfs = subscribeQueues_
{-# INLINE subscribeQueuesNtfs #-}
subscribeQueues_ :: ServiceParty p => SMPClientAgent p -> SMPServer -> NonEmpty (QueueId, C.APrivateAuthKey) -> IO ()
subscribeQueues_ ca srv subs = do
atomically $ addPendingSubs ca srv $ L.toList subs
runExceptT (getSMPServerClient' ca srv) >>= \case
Right smp -> smpSubscribeQueues ca smp srv subs
Left _ -> pure () -- no call to reconnectClient - failing getSMPServerClient' does that
smpSubscribeQueues :: ServiceParty p => SMPClientAgent p -> SMPClient -> SMPServer -> NonEmpty (QueueId, C.APrivateAuthKey) -> IO ()
smpSubscribeQueues ca smp srv subs = do
rs <- case agentParty ca of
SRecipientService -> subscribeSMPQueues smp subs
SNotifierService -> subscribeSMPQueuesNtfs smp subs
rs' <-
atomically $
ifM
(activeClientSession ca smp srv)
(Just <$> processSubscriptions rs)
(pure Nothing)
case rs' of
Just (tempErrs, finalErrs, (qOks, sQs), _) -> do
notify_ (`CASubscribed` Nothing) $ map fst qOks
when (isJust smpServiceId) $ notify_ (`CASubscribed` smpServiceId) sQs
notify_ CASubError finalErrs
when tempErrs $ reconnectClient ca srv
Nothing -> reconnectClient ca srv
where
processSubscriptions :: NonEmpty (Either SMPClientError (Maybe ServiceId)) -> STM (Bool, [(QueueId, SMPClientError)], ([(QueueId, (SessionId, C.APrivateAuthKey))], [QueueId]), [QueueId])
processSubscriptions rs = do
pending <- maybe (pure M.empty) readTVar =<< TM.lookup srv (pendingQueueSubs ca)
let acc@(_, _, (qOks, sQs), notPending) = foldr (groupSub pending) (False, [], ([], []), []) (L.zip subs rs)
unless (null qOks) $ addActiveSubs ca srv qOks
unless (null sQs) $ forM_ smpServiceId $ \serviceId ->
updateActiveServiceSub ca srv ((serviceId, fromIntegral $ length sQs), sessId)
unless (null notPending) $ removePendingSubs ca srv notPending
pure acc
sessId = sessionId $ thParams smp
smpServiceId = (\THClientService {serviceId} -> serviceId) <$> smpClientService smp
groupSub ::
Map QueueId C.APrivateAuthKey ->
((QueueId, C.APrivateAuthKey), Either SMPClientError (Maybe ServiceId)) ->
(Bool, [(QueueId, SMPClientError)], ([(QueueId, (SessionId, C.APrivateAuthKey))], [QueueId]), [QueueId]) ->
(Bool, [(QueueId, SMPClientError)], ([(QueueId, (SessionId, C.APrivateAuthKey))], [QueueId]), [QueueId])
groupSub pending ((qId, pk), r) acc@(!tempErrs, finalErrs, oks@(qOks, sQs), notPending) = case r of
Right serviceId_
| M.member qId pending ->
let oks' = case (smpServiceId, serviceId_) of
(Just sId, Just sId') | sId == sId' -> (qOks, qId : sQs)
_ -> ((qId, (sessId, pk)) : qOks, sQs)
in (tempErrs, finalErrs, oks', qId : notPending)
| otherwise -> acc
Left e
| temporaryClientError e -> (True, finalErrs, oks, notPending)
| otherwise -> (tempErrs, (qId, e) : finalErrs, oks, qId : notPending)
notify_ :: (SMPServer -> NonEmpty a -> SMPClientAgentEvent) -> [a] -> IO ()
notify_ evt qs = mapM_ (notify ca . evt srv) $ L.nonEmpty qs
subscribeServiceNtfs :: SMPClientAgent 'NotifierService -> SMPServer -> (ServiceId, Int64) -> IO ()
subscribeServiceNtfs = subscribeService_
{-# INLINE subscribeServiceNtfs #-}
subscribeService_ :: (PartyI p, ServiceParty p) => SMPClientAgent p -> SMPServer -> (ServiceId, Int64) -> IO ()
subscribeService_ ca srv serviceSub = do
atomically $ setPendingServiceSub ca srv $ Just serviceSub
runExceptT (getSMPServerClient' ca srv) >>= \case
Right smp -> smpSubscribeService ca smp srv serviceSub
Left _ -> pure () -- no call to reconnectClient - failing getSMPServerClient' does that
smpSubscribeService :: (PartyI p, ServiceParty p) => SMPClientAgent p -> SMPClient -> SMPServer -> (ServiceId, Int64) -> IO ()
smpSubscribeService ca smp srv serviceSub@(serviceId, _) = case smpClientService smp of
Just service | serviceAvailable service -> subscribe
_ -> notifyUnavailable
where
subscribe = do
r <- runExceptT $ subscribeService smp $ agentParty ca
ok <-
atomically $
ifM
(activeClientSession ca smp srv)
(True <$ processSubscription r)
(pure False)
if ok
then case r of
Right n -> notify ca $ CAServiceSubscribed srv serviceSub n
Left e
| smpClientServiceError e -> notifyUnavailable
| temporaryClientError e -> reconnectClient ca srv
| otherwise -> notify ca $ CAServiceSubError srv serviceSub e
else reconnectClient ca srv
processSubscription = mapM_ $ \n -> do
setActiveServiceSub ca srv $ Just ((serviceId, n), sessId)
setPendingServiceSub ca srv Nothing
serviceAvailable THClientService {serviceRole, serviceId = serviceId'} =
serviceId == serviceId' && partyServiceRole (agentParty ca) == serviceRole
notifyUnavailable = do
atomically $ setPendingServiceSub ca srv Nothing
notify ca $ CAServiceUnavailable srv serviceSub -- this will resubscribe all queues directly
sessId = sessionId $ thParams smp
activeClientSession' :: SMPClientAgent p -> SessionId -> SMPServer -> STM Bool
activeClientSession' ca sessId srv = sameSess <$> tryReadSessVar srv (smpClients ca)
where
sameSess = \case
Just (Right (_, smp')) -> sessId == sessionId (thParams smp')
_ -> False
activeClientSession :: SMPClientAgent p -> SMPClient -> SMPServer -> STM Bool
activeClientSession ca = activeClientSession' ca . sessionId . thParams
showServer :: SMPServer -> ByteString
showServer ProtocolServer {host, port} =
strEncode host <> B.pack (if null port then "" else ':' : port)
addActiveSubs :: SMPClientAgent p -> SMPServer -> [(QueueId, (SessionId, C.APrivateAuthKey))] -> STM ()
addActiveSubs = addSubsList_ . activeQueueSubs
{-# INLINE addActiveSubs #-}
addPendingSubs :: SMPClientAgent p -> SMPServer -> [(QueueId, C.APrivateAuthKey)] -> STM ()
addPendingSubs = addSubsList_ . pendingQueueSubs
{-# INLINE addPendingSubs #-}
addSubsList_ :: TMap SMPServer (TMap QueueId s) -> SMPServer -> [(QueueId, s)] -> STM ()
addSubsList_ subs srv ss = addSubs_ subs srv $ M.fromList ss
-- where
-- ss' = M.fromList $ map (first (party,)) ss
addSubs_ :: TMap SMPServer (TMap QueueId s) -> SMPServer -> Map QueueId s -> STM ()
addSubs_ subs srv ss =
TM.lookup srv subs >>= \case
Just m -> TM.union ss m
_ -> TM.insertM srv (newTVar ss) subs
setActiveServiceSub :: SMPClientAgent p -> SMPServer -> Maybe ((ServiceId, Int64), SessionId) -> STM ()
setActiveServiceSub = setServiceSub_ activeServiceSubs
{-# INLINE setActiveServiceSub #-}
setPendingServiceSub :: SMPClientAgent p -> SMPServer -> Maybe (ServiceId, Int64) -> STM ()
setPendingServiceSub = setServiceSub_ pendingServiceSubs
{-# INLINE setPendingServiceSub #-}
setServiceSub_ ::
(SMPClientAgent p -> TMap SMPServer (TVar (Maybe sub))) ->
SMPClientAgent p ->
SMPServer ->
Maybe sub ->
STM ()
setServiceSub_ subsSel ca srv sub =
TM.lookup srv (subsSel ca) >>= \case
Just v -> writeTVar v sub
Nothing -> TM.insertM srv (newTVar sub) (subsSel ca)
updateActiveServiceSub :: SMPClientAgent p -> SMPServer -> ((ServiceId, Int64), SessionId) -> STM ()
updateActiveServiceSub ca srv sub@((serviceId', n'), sessId') =
TM.lookup srv (activeServiceSubs ca) >>= \case
Just v -> modifyTVar' v $ \case
Just ((serviceId, n), sessId) | serviceId == serviceId' && sessId == sessId' ->
Just ((serviceId, n + n'), sessId)
_ -> Just sub
Nothing -> TM.insertM srv (newTVar $ Just sub) (activeServiceSubs ca)
removeActiveSub :: SMPClientAgent p -> SMPServer -> QueueId -> STM ()
removeActiveSub = removeSub_ . activeQueueSubs
{-# INLINE removeActiveSub #-}
removePendingSub :: SMPClientAgent p -> SMPServer -> QueueId -> STM ()
removePendingSub = removeSub_ . pendingQueueSubs
{-# INLINE removePendingSub #-}
removeSub_ :: TMap SMPServer (TMap QueueId s) -> SMPServer -> QueueId -> STM ()
removeSub_ subs srv s = TM.lookup srv subs >>= mapM_ (TM.delete s)
removeActiveSubs :: SMPClientAgent p -> SMPServer -> [QueueId] -> STM ()
removeActiveSubs = removeSubs_ . activeQueueSubs
{-# INLINE removeActiveSubs #-}
removePendingSubs :: SMPClientAgent p -> SMPServer -> [QueueId] -> STM ()
removePendingSubs = removeSubs_ . pendingQueueSubs
{-# INLINE removePendingSubs #-}
removeSubs_ :: TMap SMPServer (TMap QueueId s) -> SMPServer -> [QueueId] -> STM ()
removeSubs_ subs srv qs = TM.lookup srv subs >>= mapM_ (`modifyTVar'` (`M.withoutKeys` S.fromList qs))