Files
simplexmq/src/Simplex/Messaging/Client/Agent.hs
Evgeny db4b27e88a agent: create user with option to enable client service (#1684)
* agent: create user with option to enable client service

* handle HTTP2 errors

* do not catch async exceptions
2025-12-27 09:12:22 +00:00

598 lines
26 KiB
Haskell

{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE DuplicateRecordFields #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TupleSections #-}
{-# LANGUAGE TypeApplications #-}
module Simplex.Messaging.Client.Agent
( SMPClientAgent (..),
SMPClientAgentConfig (..),
SMPClientAgentEvent (..),
DBService (..),
OwnServer,
defaultSMPClientAgentConfig,
newSMPClientAgent,
getSMPServerClient'',
getConnectedSMPServerClient,
closeSMPClientAgent,
lookupSMPServerClient,
isOwnServer,
subscribeServiceNtfs,
subscribeQueuesNtfs,
activeClientSession',
removeActiveSub,
removeActiveSubs,
removePendingSub,
removePendingSubs,
)
where
import Control.Concurrent (forkIO)
import Control.Concurrent.Async (Async, uninterruptibleCancel)
import Control.Concurrent.STM (retry)
import qualified Control.Exception as E
import Control.Logger.Simple
import Control.Monad
import Control.Monad.Except
import Control.Monad.IO.Unlift
import Control.Monad.Trans.Except
import Crypto.Random (ChaChaDRG)
import Data.ByteString.Char8 (ByteString)
import qualified Data.ByteString.Char8 as B
import Data.Constraint (Dict (..))
import Data.List.NonEmpty (NonEmpty)
import qualified Data.List.NonEmpty as L
import Data.Map.Strict (Map)
import qualified Data.Map.Strict as M
import Data.Maybe (isJust, isNothing)
import qualified Data.Set as S
import Data.Text.Encoding
import Data.Time.Clock (NominalDiffTime, UTCTime, addUTCTime, getCurrentTime)
import Numeric.Natural
import Simplex.Messaging.Agent.RetryInterval
import Simplex.Messaging.Client
import qualified Simplex.Messaging.Crypto as C
import Simplex.Messaging.Encoding.String
import Simplex.Messaging.Protocol
( BrokerMsg,
ErrorType,
NotifierId,
NtfPrivateAuthKey,
Party (..),
PartyI,
ProtocolServer (..),
QueueId,
SMPServer,
ServiceSub (..),
SParty (..),
ServiceParty,
serviceParty,
partyServiceRole,
queueIdsHash,
)
import Simplex.Messaging.Session
import Simplex.Messaging.TMap (TMap)
import qualified Simplex.Messaging.TMap as TM
import Simplex.Messaging.Transport
import Simplex.Messaging.Util (catchAll_, ifM, safeDecodeUtf8, toChunks, tshow, whenM, ($>>=), (<$$>))
import System.Timeout (timeout)
import UnliftIO (async)
import UnliftIO.STM
type SMPClientVar = SessionVar (Either (SMPClientError, Maybe UTCTime) (OwnServer, SMPClient))
data SMPClientAgentEvent
= CAConnected SMPServer (Maybe ServiceId)
| CADisconnected SMPServer (NonEmpty QueueId)
| CASubscribed SMPServer (Maybe ServiceId) (NonEmpty QueueId)
| CASubError SMPServer (NonEmpty (QueueId, SMPClientError))
| CAServiceDisconnected SMPServer ServiceSub
| CAServiceSubscribed {subServer :: SMPServer, expected :: ServiceSub, subscribed :: ServiceSub}
| CAServiceSubError SMPServer ServiceSub SMPClientError
-- CAServiceUnavailable is used when service ID in pending subscription is different from the current service in connection.
-- This will require resubscribing to all queues associated with this service ID individually, creating new associations.
-- It may happen if, for example, SMP server deletes service information (e.g. via downgrade and upgrade)
-- and assigns different service ID to the service certificate.
| CAServiceUnavailable SMPServer ServiceSub
data SMPClientAgentConfig = SMPClientAgentConfig
{ smpCfg :: ProtocolClientConfig SMPVersion,
reconnectInterval :: RetryInterval,
persistErrorInterval :: NominalDiffTime,
msgQSize :: Natural,
agentQSize :: Natural,
agentSubsBatchSize :: Int,
ownServerDomains :: [ByteString]
}
defaultSMPClientAgentConfig :: SMPClientAgentConfig
defaultSMPClientAgentConfig =
SMPClientAgentConfig
{ smpCfg = defaultSMPClientConfig,
reconnectInterval =
RetryInterval
{ initialInterval = second,
increaseAfter = 10 * second,
maxInterval = 10 * second
},
persistErrorInterval = 30, -- seconds
msgQSize = 2048,
agentQSize = 2048,
agentSubsBatchSize = 1360,
ownServerDomains = []
}
where
second = 1000000
data SMPClientAgent p = SMPClientAgent
{ agentCfg :: SMPClientAgentConfig,
agentParty :: SParty p,
dbService :: Maybe DBService,
active :: TVar Bool,
startedAt :: UTCTime,
msgQ :: TBQueue (ServerTransmissionBatch SMPVersion ErrorType BrokerMsg),
agentQ :: TBQueue SMPClientAgentEvent,
randomDrg :: TVar ChaChaDRG,
smpClients :: TMap SMPServer SMPClientVar,
smpSessions :: TMap SessionId (OwnServer, SMPClient),
-- Only one service subscription can exist per server with this agent.
-- With correctly functioning SMP server, queue and service subscriptions can't be
-- active at the same time.
activeServiceSubs :: TMap SMPServer (TVar (Maybe (ServiceSub, SessionId))),
activeQueueSubs :: TMap SMPServer (TMap QueueId (SessionId, C.APrivateAuthKey)),
-- Pending service subscriptions can co-exist with pending queue subscriptions
-- on the same SMP server during subscriptions being transitioned from per-queue to service.
pendingServiceSubs :: TMap SMPServer (TVar (Maybe ServiceSub)),
pendingQueueSubs :: TMap SMPServer (TMap QueueId C.APrivateAuthKey),
smpSubWorkers :: TMap SMPServer (SessionVar (Async ())),
workerSeq :: TVar Int
}
type OwnServer = Bool
newSMPClientAgent :: SParty p -> SMPClientAgentConfig -> Maybe DBService -> TVar ChaChaDRG -> IO (SMPClientAgent p)
newSMPClientAgent agentParty agentCfg@SMPClientAgentConfig {msgQSize, agentQSize} dbService randomDrg = do
active <- newTVarIO True
startedAt <- getCurrentTime
msgQ <- newTBQueueIO msgQSize
agentQ <- newTBQueueIO agentQSize
smpClients <- TM.emptyIO
smpSessions <- TM.emptyIO
activeServiceSubs <- TM.emptyIO
activeQueueSubs <- TM.emptyIO
pendingServiceSubs <- TM.emptyIO
pendingQueueSubs <- TM.emptyIO
smpSubWorkers <- TM.emptyIO
workerSeq <- newTVarIO 0
pure
SMPClientAgent
{ agentCfg,
agentParty,
dbService,
active,
startedAt,
msgQ,
agentQ,
randomDrg,
smpClients,
smpSessions,
activeServiceSubs,
activeQueueSubs,
pendingServiceSubs,
pendingQueueSubs,
smpSubWorkers,
workerSeq
}
data DBService = DBService
{ getCredentials :: SMPServer -> IO (Either SMPClientError ServiceCredentials),
updateServiceId :: SMPServer -> Maybe ServiceId -> IO (Either SMPClientError ())
}
-- | Get or create SMP client for SMPServer
getSMPServerClient' :: SMPClientAgent p -> SMPServer -> ExceptT SMPClientError IO SMPClient
getSMPServerClient' ca srv = snd <$> getSMPServerClient'' ca srv
{-# INLINE getSMPServerClient' #-}
getSMPServerClient'' :: SMPClientAgent p -> SMPServer -> ExceptT SMPClientError IO (OwnServer, SMPClient)
getSMPServerClient'' ca@SMPClientAgent {agentCfg, smpClients, smpSessions, workerSeq} srv = do
ts <- liftIO getCurrentTime
atomically (getClientVar ts) >>= either (ExceptT . newSMPClient) waitForSMPClient
where
getClientVar :: UTCTime -> STM (Either SMPClientVar SMPClientVar)
getClientVar = getSessVar workerSeq srv smpClients
waitForSMPClient :: SMPClientVar -> ExceptT SMPClientError IO (OwnServer, SMPClient)
waitForSMPClient v = do
let ProtocolClientConfig {networkConfig = NetworkConfig {tcpConnectTimeout}} = smpCfg agentCfg
smpClient_ <- liftIO $ netTimeoutInt tcpConnectTimeout NRMBackground `timeout` atomically (readTMVar $ sessionVar v)
case smpClient_ of
Just (Right smpClient) -> pure smpClient
Just (Left (e, ts_)) -> case ts_ of
Nothing -> throwE e
Just ts ->
ifM
((ts <) <$> liftIO getCurrentTime)
(atomically (removeSessVar v srv smpClients) >> getSMPServerClient'' ca srv)
(throwE e)
Nothing -> throwE PCEResponseTimeout
newSMPClient :: SMPClientVar -> IO (Either SMPClientError (OwnServer, SMPClient))
newSMPClient v = do
r <- connectClient ca srv v `E.catches` clientHandlers
case r of
Right smp -> do
logInfo . decodeUtf8 $ "Agent connected to " <> showServer srv
let !owned = isOwnServer ca srv
!c = (owned, smp)
atomically $ do
putTMVar (sessionVar v) (Right c)
TM.insert (sessionId $ thParams smp) c smpSessions
notify ca $ CAConnected srv $ smpClientServiceId smp
pure $ Right c
Left e -> do
let ei = persistErrorInterval agentCfg
if ei == 0
then atomically $ do
putTMVar (sessionVar v) (Left (e, Nothing))
removeSessVar v srv smpClients
else do
ts <- addUTCTime ei <$> liftIO getCurrentTime
atomically $ putTMVar (sessionVar v) (Left (e, Just ts))
reconnectClient ca srv
pure $ Left e
isOwnServer :: SMPClientAgent p -> SMPServer -> OwnServer
isOwnServer SMPClientAgent {agentCfg} ProtocolServer {host} =
let srv = strEncode $ L.head host
in any (\s -> s == srv || B.cons '.' s `B.isSuffixOf` srv) (ownServerDomains agentCfg)
-- | Run an SMP client for SMPClientVar
connectClient :: SMPClientAgent p -> SMPServer -> SMPClientVar -> IO (Either SMPClientError SMPClient)
connectClient ca@SMPClientAgent {agentCfg, dbService, smpClients, smpSessions, msgQ, randomDrg, startedAt} srv v = case dbService of
Just dbs -> runExceptT $ do
creds <- ExceptT $ getCredentials dbs srv
smp <- ExceptT $ getClient cfg {serviceCredentials = Just creds}
whenM (atomically $ activeClientSession ca smp srv) $
ExceptT $ updateServiceId dbs srv $ smpClientServiceId smp
pure smp
Nothing -> getClient cfg
where
cfg = smpCfg agentCfg
getClient cfg' = getProtocolClient randomDrg NRMBackground (1, srv, Nothing) cfg' [] (Just msgQ) startedAt clientDisconnected
clientDisconnected :: SMPClient -> IO ()
clientDisconnected smp = do
removeClientAndSubs smp >>= serverDown
logInfo . decodeUtf8 $ "Agent disconnected from " <> showServer srv
removeClientAndSubs :: SMPClient -> IO (Maybe ServiceSub, Maybe (Map QueueId C.APrivateAuthKey))
removeClientAndSubs smp = do
-- Looking up subscription vars outside of STM transaction to reduce re-evaluation.
-- It is possible because these vars are never removed, they are only added.
sVar_ <- TM.lookupIO srv $ activeServiceSubs ca
qVar_ <- TM.lookupIO srv $ activeQueueSubs ca
atomically $ do
TM.delete sessId smpSessions
removeSessVar v srv smpClients
sSub <- pure sVar_ $>>= updateServiceSub
qSubs <- pure qVar_ $>>= updateQueueSubs
pure (sSub, qSubs)
where
sessId = sessionId $ thParams smp
updateServiceSub sVar = do -- (sub, sessId')
-- We don't change active subscription in case session ID is different from disconnected client
serviceSub_ <- stateTVar sVar $ \case
Just (serviceSub, sessId') | sessId == sessId' -> (Just serviceSub, Nothing)
s -> (Nothing, s)
-- We don't reset pending subscription to Nothing here to avoid any race conditions
-- with subsequent client sessions that might have set pending already.
when (isJust serviceSub_) $ setPendingServiceSub ca srv serviceSub_
pure serviceSub_
updateQueueSubs qVar = do
-- removing subscriptions that have matching sessionId to disconnected client
-- and keep the other ones (they can be made by the new client)
subs <- M.map snd <$> stateTVar qVar (M.partition ((sessId ==) . fst))
if M.null subs
then pure Nothing
else Just subs <$ addSubs_ (pendingQueueSubs ca) srv subs
serverDown :: (Maybe ServiceSub, Maybe (Map QueueId C.APrivateAuthKey)) -> IO ()
serverDown (sSub, qSubs) = do
mapM_ (notify ca . CAServiceDisconnected srv) sSub
let qIds = L.nonEmpty . M.keys =<< qSubs
mapM_ (notify ca . CADisconnected srv) qIds
when (isJust sSub || isJust qIds) $ reconnectClient ca srv
-- | Spawn reconnect worker if needed
reconnectClient :: SMPClientAgent p -> SMPServer -> IO ()
reconnectClient ca@SMPClientAgent {active, agentCfg, smpSubWorkers, workerSeq} srv = do
ts <- getCurrentTime
whenM (readTVarIO active) $ atomically (getWorkerVar ts) >>= mapM_ (either newSubWorker (\_ -> pure ()))
where
getWorkerVar ts =
ifM
(noPending <$> getPending TM.lookup readTVar)
(pure Nothing) -- prevent race with cleanup and adding pending queues in another call
(Just <$> getSessVar workerSeq srv smpSubWorkers ts)
newSubWorker :: SessionVar (Async ()) -> IO ()
newSubWorker v = do
a <- async $ void (E.try @E.SomeException runSubWorker) >> atomically (cleanup v)
atomically $ putTMVar (sessionVar v) a
runSubWorker =
withRetryInterval (reconnectInterval agentCfg) $ \_ loop -> do
subs <- getPending TM.lookupIO readTVarIO
unless (noPending subs) $ whenM (readTVarIO active) $ do
void $ netTimeoutInt tcpConnectTimeout NRMBackground `timeout` runExceptT (reconnectSMPClient ca srv subs)
loop
ProtocolClientConfig {networkConfig = NetworkConfig {tcpConnectTimeout}} = smpCfg agentCfg
noPending (sSub, qSubs) = isNothing sSub && maybe True M.null qSubs
getPending :: Monad m => (forall a. SMPServer -> TMap SMPServer a -> m (Maybe a)) -> (forall a. TVar a -> m a) -> m (Maybe ServiceSub, Maybe (Map QueueId C.APrivateAuthKey))
getPending lkup rd = do
sSub <- lkup srv (pendingServiceSubs ca) $>>= rd
qSubs <- lkup srv (pendingQueueSubs ca) >>= mapM rd
pure (sSub, qSubs)
cleanup :: SessionVar (Async ()) -> STM ()
cleanup v = do
-- Here we wait until TMVar is not empty to prevent worker cleanup happening before worker is added to TMVar.
-- Not waiting may result in terminated worker remaining in the map.
whenM (isEmptyTMVar $ sessionVar v) retry
removeSessVar v srv smpSubWorkers
reconnectSMPClient :: forall p. SMPClientAgent p -> SMPServer -> (Maybe ServiceSub, Maybe (Map QueueId C.APrivateAuthKey)) -> ExceptT SMPClientError IO ()
reconnectSMPClient ca@SMPClientAgent {agentCfg, agentParty} srv (sSub_, qSubs_) =
withSMP ca srv $ \smp -> liftIO $ case serviceParty agentParty of
Just Dict -> resubscribe smp
Nothing -> pure ()
where
resubscribe :: (PartyI p, ServiceParty p) => SMPClient -> IO ()
resubscribe smp = do
mapM_ (smpSubscribeService ca smp srv) sSub_
forM_ qSubs_ $ \qSubs -> do
currSubs_ <- mapM readTVarIO =<< TM.lookupIO srv (activeQueueSubs ca)
let qSubs' :: [(QueueId, C.APrivateAuthKey)] =
maybe id (\currSubs -> filter ((`M.notMember` currSubs) . fst)) currSubs_ $ M.assocs qSubs
mapM_ (smpSubscribeQueues @p ca smp srv) $ toChunks (agentSubsBatchSize agentCfg) qSubs'
notify :: MonadIO m => SMPClientAgent p -> SMPClientAgentEvent -> m ()
notify ca evt = atomically $ writeTBQueue (agentQ ca) evt
{-# INLINE notify #-}
-- Returns already connected client for proxying messages or Nothing if client is absent, not connected yet or stores expired error.
-- If Nothing is return proxy will spawn a new thread to wait or to create another client connection to destination relay.
getConnectedSMPServerClient :: SMPClientAgent p -> SMPServer -> IO (Maybe (Either SMPClientError (OwnServer, SMPClient)))
getConnectedSMPServerClient SMPClientAgent {smpClients} srv =
atomically (TM.lookup srv smpClients $>>= \v -> (v,) <$$> tryReadTMVar (sessionVar v)) -- Nothing: client is absent or not connected yet
$>>= \case
(_, Right r) -> pure $ Just $ Right r
(v, Left (e, ts_)) ->
pure ts_ $>>= \ts ->
-- proxy will create a new connection if ts_ is Nothing
ifM
((ts <) <$> liftIO getCurrentTime) -- error persistence interval period expired?
(Nothing <$ atomically (removeSessVar v srv smpClients)) -- proxy will create a new connection
(pure $ Just $ Left e) -- not expired, returning error
lookupSMPServerClient :: SMPClientAgent p -> SessionId -> IO (Maybe (OwnServer, SMPClient))
lookupSMPServerClient SMPClientAgent {smpSessions} sessId = TM.lookupIO sessId smpSessions
closeSMPClientAgent :: SMPClientAgent p -> IO ()
closeSMPClientAgent c = do
atomically $ writeTVar (active c) False
closeSMPServerClients c
atomically (swapTVar (smpSubWorkers c) M.empty) >>= mapM_ cancelReconnect
where
cancelReconnect :: SessionVar (Async ()) -> IO ()
cancelReconnect v = void . forkIO $ atomically (readTMVar $ sessionVar v) >>= uninterruptibleCancel
closeSMPServerClients :: SMPClientAgent p -> IO ()
closeSMPServerClients c = atomically (smpClients c `swapTVar` M.empty) >>= mapM_ (forkIO . closeClient)
where
closeClient v =
atomically (readTMVar $ sessionVar v) >>= \case
Right (_, smp) -> closeProtocolClient smp `catchAll_` pure ()
_ -> pure ()
cancelActions :: Foldable f => TVar (f (Async ())) -> IO ()
cancelActions as = readTVarIO as >>= mapM_ uninterruptibleCancel
withSMP :: SMPClientAgent p -> SMPServer -> (SMPClient -> ExceptT SMPClientError IO a) -> ExceptT SMPClientError IO a
withSMP ca srv action = (getSMPServerClient' ca srv >>= action) `catchE` logSMPError
where
logSMPError :: SMPClientError -> ExceptT SMPClientError IO a
logSMPError e = do
logInfo $ "SMP error (" <> safeDecodeUtf8 (strEncode srv) <> "): " <> tshow e
throwE e
subscribeQueuesNtfs :: SMPClientAgent 'NotifierService -> SMPServer -> NonEmpty (NotifierId, NtfPrivateAuthKey) -> IO ()
subscribeQueuesNtfs = subscribeQueues_
{-# INLINE subscribeQueuesNtfs #-}
subscribeQueues_ :: ServiceParty p => SMPClientAgent p -> SMPServer -> NonEmpty (QueueId, C.APrivateAuthKey) -> IO ()
subscribeQueues_ ca srv subs = do
atomically $ addPendingSubs ca srv $ L.toList subs
runExceptT (getSMPServerClient' ca srv) >>= \case
Right smp -> smpSubscribeQueues ca smp srv subs
Left _ -> pure () -- no call to reconnectClient - failing getSMPServerClient' does that
smpSubscribeQueues :: ServiceParty p => SMPClientAgent p -> SMPClient -> SMPServer -> NonEmpty (QueueId, C.APrivateAuthKey) -> IO ()
smpSubscribeQueues ca smp srv subs = do
rs <- case agentParty ca of
SRecipientService -> subscribeSMPQueues smp subs
SNotifierService -> subscribeSMPQueuesNtfs smp subs
rs' <-
atomically $
ifM
(activeClientSession ca smp srv)
(Just <$> processSubscriptions rs)
(pure Nothing)
case rs' of
Just (tempErrs, finalErrs, (qOks, sQs), _) -> do
notify_ (`CASubscribed` Nothing) $ map fst qOks
when (isJust smpServiceId) $ notify_ (`CASubscribed` smpServiceId) sQs
notify_ CASubError finalErrs
when tempErrs $ reconnectClient ca srv
Nothing -> reconnectClient ca srv
where
processSubscriptions :: NonEmpty (Either SMPClientError (Maybe ServiceId)) -> STM (Bool, [(QueueId, SMPClientError)], ([(QueueId, (SessionId, C.APrivateAuthKey))], [QueueId]), [QueueId])
processSubscriptions rs = do
pending <- maybe (pure M.empty) readTVar =<< TM.lookup srv (pendingQueueSubs ca)
let acc@(_, _, (qOks, sQs), notPending) = foldr (groupSub pending) (False, [], ([], []), []) (L.zip subs rs)
unless (null qOks) $ addActiveSubs ca srv qOks
unless (null sQs) $ forM_ smpServiceId $ \serviceId ->
updateActiveServiceSub ca srv (ServiceSub serviceId (fromIntegral $ length sQs) (queueIdsHash sQs), sessId)
unless (null notPending) $ removePendingSubs ca srv notPending
pure acc
sessId = sessionId $ thParams smp
smpServiceId = smpClientServiceId smp
groupSub ::
Map QueueId C.APrivateAuthKey ->
((QueueId, C.APrivateAuthKey), Either SMPClientError (Maybe ServiceId)) ->
(Bool, [(QueueId, SMPClientError)], ([(QueueId, (SessionId, C.APrivateAuthKey))], [QueueId]), [QueueId]) ->
(Bool, [(QueueId, SMPClientError)], ([(QueueId, (SessionId, C.APrivateAuthKey))], [QueueId]), [QueueId])
groupSub pending ((qId, pk), r) acc@(!tempErrs, finalErrs, oks@(qOks, sQs), notPending) = case r of
Right serviceId_
| M.member qId pending ->
let oks' = case (smpServiceId, serviceId_) of
(Just sId, Just sId') | sId == sId' -> (qOks, qId : sQs)
_ -> ((qId, (sessId, pk)) : qOks, sQs)
in (tempErrs, finalErrs, oks', qId : notPending)
| otherwise -> acc
Left e
| temporaryClientError e -> (True, finalErrs, oks, notPending)
| otherwise -> (tempErrs, (qId, e) : finalErrs, oks, qId : notPending)
notify_ :: (SMPServer -> NonEmpty a -> SMPClientAgentEvent) -> [a] -> IO ()
notify_ evt qs = mapM_ (notify ca . evt srv) $ L.nonEmpty qs
subscribeServiceNtfs :: SMPClientAgent 'NotifierService -> SMPServer -> ServiceSub -> IO ()
subscribeServiceNtfs = subscribeService_
{-# INLINE subscribeServiceNtfs #-}
subscribeService_ :: (PartyI p, ServiceParty p) => SMPClientAgent p -> SMPServer -> ServiceSub -> IO ()
subscribeService_ ca srv serviceSub = do
atomically $ setPendingServiceSub ca srv $ Just serviceSub
runExceptT (getSMPServerClient' ca srv) >>= \case
Right smp -> smpSubscribeService ca smp srv serviceSub
Left _ -> pure () -- no call to reconnectClient - failing getSMPServerClient' does that
smpSubscribeService :: (PartyI p, ServiceParty p) => SMPClientAgent p -> SMPClient -> SMPServer -> ServiceSub -> IO ()
smpSubscribeService ca smp srv serviceSub@(ServiceSub serviceId n idsHash) = case smpClientService smp of
Just service | serviceAvailable service -> subscribe
_ -> notifyUnavailable
where
subscribe = do
r <- runExceptT $ subscribeService smp (agentParty ca) n idsHash
ok <-
atomically $
ifM
(activeClientSession ca smp srv)
(True <$ processSubscription r)
(pure False)
if ok
then case r of
Right serviceSub' -> notify ca $ CAServiceSubscribed srv serviceSub serviceSub'
Left e
| smpClientServiceError e -> notifyUnavailable
| temporaryClientError e -> reconnectClient ca srv
| otherwise -> notify ca $ CAServiceSubError srv serviceSub e
else reconnectClient ca srv
processSubscription = mapM_ $ \serviceSub' -> do -- TODO [certs rcv] validate hash here?
setActiveServiceSub ca srv $ Just (serviceSub', sessId)
setPendingServiceSub ca srv Nothing
serviceAvailable THClientService {serviceRole, serviceId = serviceId'} =
serviceId == serviceId' && partyServiceRole (agentParty ca) == serviceRole
notifyUnavailable = do
atomically $ setPendingServiceSub ca srv Nothing
notify ca $ CAServiceUnavailable srv serviceSub -- this will resubscribe all queues directly
sessId = sessionId $ thParams smp
activeClientSession' :: SMPClientAgent p -> SessionId -> SMPServer -> STM Bool
activeClientSession' ca sessId srv = sameSess <$> tryReadSessVar srv (smpClients ca)
where
sameSess = \case
Just (Right (_, smp')) -> sessId == sessionId (thParams smp')
_ -> False
activeClientSession :: SMPClientAgent p -> SMPClient -> SMPServer -> STM Bool
activeClientSession ca = activeClientSession' ca . sessionId . thParams
showServer :: SMPServer -> ByteString
showServer ProtocolServer {host, port} =
strEncode host <> B.pack (if null port then "" else ':' : port)
addActiveSubs :: SMPClientAgent p -> SMPServer -> [(QueueId, (SessionId, C.APrivateAuthKey))] -> STM ()
addActiveSubs = addSubsList_ . activeQueueSubs
{-# INLINE addActiveSubs #-}
addPendingSubs :: SMPClientAgent p -> SMPServer -> [(QueueId, C.APrivateAuthKey)] -> STM ()
addPendingSubs = addSubsList_ . pendingQueueSubs
{-# INLINE addPendingSubs #-}
addSubsList_ :: TMap SMPServer (TMap QueueId s) -> SMPServer -> [(QueueId, s)] -> STM ()
addSubsList_ subs srv ss = addSubs_ subs srv $ M.fromList ss
-- where
-- ss' = M.fromList $ map (first (party,)) ss
addSubs_ :: TMap SMPServer (TMap QueueId s) -> SMPServer -> Map QueueId s -> STM ()
addSubs_ subs srv ss =
TM.lookup srv subs >>= \case
Just m -> TM.union ss m
_ -> TM.insertM srv (newTVar ss) subs
setActiveServiceSub :: SMPClientAgent p -> SMPServer -> Maybe (ServiceSub, SessionId) -> STM ()
setActiveServiceSub = setServiceSub_ activeServiceSubs
{-# INLINE setActiveServiceSub #-}
setPendingServiceSub :: SMPClientAgent p -> SMPServer -> Maybe ServiceSub -> STM ()
setPendingServiceSub = setServiceSub_ pendingServiceSubs
{-# INLINE setPendingServiceSub #-}
setServiceSub_ ::
(SMPClientAgent p -> TMap SMPServer (TVar (Maybe sub))) ->
SMPClientAgent p ->
SMPServer ->
Maybe sub ->
STM ()
setServiceSub_ subsSel ca srv sub =
TM.lookup srv (subsSel ca) >>= \case
Just v -> writeTVar v sub
Nothing -> TM.insertM srv (newTVar sub) (subsSel ca)
updateActiveServiceSub :: SMPClientAgent p -> SMPServer -> (ServiceSub, SessionId) -> STM ()
updateActiveServiceSub ca srv sub@(ServiceSub serviceId' n' idsHash', sessId') =
TM.lookup srv (activeServiceSubs ca) >>= \case
Just v -> modifyTVar' v $ \case
Just (ServiceSub serviceId n idsHash, sessId) | serviceId == serviceId' && sessId == sessId' ->
Just (ServiceSub serviceId (n + n') (idsHash <> idsHash'), sessId)
_ -> Just sub
Nothing -> TM.insertM srv (newTVar $ Just sub) (activeServiceSubs ca)
removeActiveSub :: SMPClientAgent p -> SMPServer -> QueueId -> STM ()
removeActiveSub = removeSub_ . activeQueueSubs
{-# INLINE removeActiveSub #-}
removePendingSub :: SMPClientAgent p -> SMPServer -> QueueId -> STM ()
removePendingSub = removeSub_ . pendingQueueSubs
{-# INLINE removePendingSub #-}
removeSub_ :: TMap SMPServer (TMap QueueId s) -> SMPServer -> QueueId -> STM ()
removeSub_ subs srv s = TM.lookup srv subs >>= mapM_ (TM.delete s)
removeActiveSubs :: SMPClientAgent p -> SMPServer -> [QueueId] -> STM ()
removeActiveSubs = removeSubs_ . activeQueueSubs
{-# INLINE removeActiveSubs #-}
removePendingSubs :: SMPClientAgent p -> SMPServer -> [QueueId] -> STM ()
removePendingSubs = removeSubs_ . pendingQueueSubs
{-# INLINE removePendingSubs #-}
removeSubs_ :: TMap SMPServer (TMap QueueId s) -> SMPServer -> [QueueId] -> STM ()
removeSubs_ subs srv qs = TM.lookup srv subs >>= mapM_ (`modifyTVar'` (`M.withoutKeys` S.fromList qs))