mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-06-04 21:41:24 +00:00
smp: check for progress when resubscribing (#956)
* smp: check for progress when resubscribing * add allowClose to reconnectServer to distinguish entry points * resolve error todo * make reconnect async per-session * remove allowClose * deregister reconnecter when it finishes * signal/react more work explicitly * fix restart condition * wait for reconnecter to finish * remove redundant reconnect locks * rename getClientVar for expanded scope * formatting * remove withPending * move pending check to tryReconnectSMPClient loop * combine pending check and slot release transactions * actually reserve the slot for async * simplify * refactor * refactor * use removeClientVar * rename * refactor * test * reduce MonadError scope --------- Co-authored-by: Evgeny Poberezkin <2769109+epoberezkin@users.noreply.github.com>
This commit is contained in:
committed by
GitHub
parent
17f64e1565
commit
68f5e189a6
@@ -117,6 +117,9 @@ xftpTransportHost XFTPClient {http2Client = HTTP2Client {client_ = HClient {host
|
||||
xftpSessionTs :: XFTPClient -> UTCTime
|
||||
xftpSessionTs = sessionTs . http2Client
|
||||
|
||||
xftpSessionId :: XFTPClient -> ByteString
|
||||
xftpSessionId = sessionId . http2Client
|
||||
|
||||
xftpHTTP2Config :: TransportClientConfig -> XFTPClientConfig -> HTTP2ClientConfig
|
||||
xftpHTTP2Config transportConfig XFTPClientConfig {xftpNetworkConfig = NetworkConfig {tcpConnectTimeout}} =
|
||||
defaultHTTP2ClientConfig
|
||||
|
||||
@@ -1792,12 +1792,11 @@ getAgentMigrations' :: AgentMonad m => AgentClient -> m [UpMigration]
|
||||
getAgentMigrations' c = map upMigration <$> withStore' c (Migrations.getCurrent . DB.conn)
|
||||
|
||||
debugAgentLocks' :: AgentMonad' m => AgentClient -> m AgentLocks
|
||||
debugAgentLocks' AgentClient {connLocks = cs, invLocks = is, reconnectLocks = rs, deleteLock = d} = do
|
||||
debugAgentLocks' AgentClient {connLocks = cs, invLocks = is, deleteLock = d} = do
|
||||
connLocks <- getLocks cs
|
||||
invLocks <- getLocks is
|
||||
srvLocks <- getLocks rs
|
||||
delLock <- atomically $ tryReadTMVar d
|
||||
pure AgentLocks {connLocks, invLocks, srvLocks, delLock}
|
||||
pure AgentLocks {connLocks, invLocks, delLock}
|
||||
where
|
||||
getLocks ls = atomically $ M.mapKeys (B.unpack . strEncode) . M.mapMaybe id <$> (mapM tryReadTMVar =<< readTVar ls)
|
||||
|
||||
|
||||
@@ -264,15 +264,19 @@ data AgentClient = AgentClient
|
||||
invLocks :: TMap ByteString Lock,
|
||||
-- lock to prevent concurrency between periodic and async connection deletions
|
||||
deleteLock :: Lock,
|
||||
-- locks to prevent concurrent reconnections to SMP servers
|
||||
reconnectLocks :: TMap SMPTransportSession Lock,
|
||||
reconnections :: TAsyncs,
|
||||
-- smpSubWorkers for SMP servers sessions
|
||||
smpSubWorkers :: TMap SMPTransportSession (TMVar SubWorker),
|
||||
asyncClients :: TAsyncs,
|
||||
agentStats :: TMap AgentStatsKey (TVar Int),
|
||||
clientId :: Int,
|
||||
agentEnv :: Env
|
||||
}
|
||||
|
||||
data SubWorker = SubWorker
|
||||
{ subWorkerId :: Int,
|
||||
subWorkerAsync :: Async ()
|
||||
}
|
||||
|
||||
getAgentWorker :: (AgentMonad' m, Ord k, Show k) => String -> Bool -> AgentClient -> k -> TMap k Worker -> (Worker -> ExceptT AgentErrorType m ()) -> m Worker
|
||||
getAgentWorker = getAgentWorker' id pure
|
||||
|
||||
@@ -361,7 +365,6 @@ data AgentState = ASForeground | ASSuspending | ASSuspended
|
||||
data AgentLocks = AgentLocks
|
||||
{ connLocks :: Map String String,
|
||||
invLocks :: Map String String,
|
||||
srvLocks :: Map String String,
|
||||
delLock :: Maybe String
|
||||
}
|
||||
deriving (Show)
|
||||
@@ -407,8 +410,7 @@ newAgentClient InitialAgentServers {smp, ntf, xftp, netCfg} agentEnv = do
|
||||
connLocks <- TM.empty
|
||||
invLocks <- TM.empty
|
||||
deleteLock <- createLock
|
||||
reconnectLocks <- TM.empty
|
||||
reconnections <- newTAsyncs
|
||||
smpSubWorkers <- TM.empty
|
||||
asyncClients <- newTAsyncs
|
||||
agentStats <- TM.empty
|
||||
clientId <- stateTVar (clientCounter agentEnv) $ \i -> let i' = i + 1 in (i', i')
|
||||
@@ -443,8 +445,7 @@ newAgentClient InitialAgentServers {smp, ntf, xftp, netCfg} agentEnv = do
|
||||
connLocks,
|
||||
invLocks,
|
||||
deleteLock,
|
||||
reconnectLocks,
|
||||
reconnections,
|
||||
smpSubWorkers,
|
||||
asyncClients,
|
||||
agentStats,
|
||||
clientId,
|
||||
@@ -465,6 +466,7 @@ class (Encoding err, Show err) => ProtocolServerClient err msg | msg -> err wher
|
||||
clientServer :: Client msg -> String
|
||||
clientTransportHost :: Client msg -> TransportHost
|
||||
clientSessionTs :: Client msg -> UTCTime
|
||||
clientSessionId :: Client msg -> ByteString
|
||||
|
||||
instance ProtocolServerClient ErrorType BrokerMsg where
|
||||
type Client BrokerMsg = ProtocolClient ErrorType BrokerMsg
|
||||
@@ -474,6 +476,7 @@ instance ProtocolServerClient ErrorType BrokerMsg where
|
||||
clientServer = protocolClientServer
|
||||
clientTransportHost = transportHost'
|
||||
clientSessionTs = sessionTs
|
||||
clientSessionId = sessionId
|
||||
|
||||
instance ProtocolServerClient ErrorType NtfResponse where
|
||||
type Client NtfResponse = ProtocolClient ErrorType NtfResponse
|
||||
@@ -483,6 +486,7 @@ instance ProtocolServerClient ErrorType NtfResponse where
|
||||
clientServer = protocolClientServer
|
||||
clientTransportHost = transportHost'
|
||||
clientSessionTs = sessionTs
|
||||
clientSessionId = sessionId
|
||||
|
||||
instance ProtocolServerClient XFTPErrorType FileResponse where
|
||||
type Client FileResponse = XFTPClient
|
||||
@@ -492,16 +496,15 @@ instance ProtocolServerClient XFTPErrorType FileResponse where
|
||||
clientServer = X.xftpClientServer
|
||||
clientTransportHost = X.xftpTransportHost
|
||||
clientSessionTs = X.xftpSessionTs
|
||||
clientSessionId = X.xftpSessionId
|
||||
|
||||
getSMPServerClient :: forall m. AgentMonad m => AgentClient -> SMPTransportSession -> m SMPClient
|
||||
getSMPServerClient c@AgentClient {active, smpClients, msgQ} tSess@(userId, srv, _) = do
|
||||
unlessM (readTVarIO active) . throwError $ INACTIVE
|
||||
atomically (getClientVar tSess smpClients)
|
||||
atomically (getTSessVar tSess smpClients)
|
||||
>>= either newClient (waitForProtocolClient c tSess)
|
||||
where
|
||||
newClient v = do
|
||||
tc <- newTVarIO 0
|
||||
newProtocolClient c tSess smpClients connectClient (reconnectSMPClient 0 tc) v
|
||||
newClient = newProtocolClient c tSess smpClients connectClient resubscribeSMPSession
|
||||
connectClient :: m SMPClient
|
||||
connectClient = do
|
||||
cfg <- getClientConfig c smpCfg
|
||||
@@ -515,7 +518,7 @@ getSMPServerClient c@AgentClient {active, smpClients, msgQ} tSess@(userId, srv,
|
||||
where
|
||||
removeClientAndSubs :: IO ([RcvQueue], [ConnId])
|
||||
removeClientAndSubs = atomically $ do
|
||||
TM.delete tSess smpClients
|
||||
removeClientVar client tSess smpClients
|
||||
qs <- RQ.getDelSessQueues tSess $ activeSubs c
|
||||
mapM_ (`RQ.addQueue` pendingSubs c) qs
|
||||
let cs = S.fromList $ map qConnId qs
|
||||
@@ -529,41 +532,49 @@ getSMPServerClient c@AgentClient {active, smpClients, msgQ} tSess@(userId, srv,
|
||||
unless (null conns) $ notifySub "" $ DOWN srv conns
|
||||
unless (null qs) $ do
|
||||
atomically $ mapM_ (releaseGetLock c) qs
|
||||
unliftIO u $ reconnectServer c tSess
|
||||
unliftIO u $ resubscribeSMPSession c tSess
|
||||
|
||||
notifySub :: forall e. AEntityI e => ConnId -> ACommand 'Agent e -> IO ()
|
||||
notifySub connId cmd = atomically $ writeTBQueue (subQ c) ("", connId, APC (sAEntity @e) cmd)
|
||||
|
||||
reconnectServer :: AgentMonad m => AgentClient -> SMPTransportSession -> m ()
|
||||
reconnectServer c tSess = newAsyncAction tryReconnectSMPClient $ reconnections c
|
||||
resubscribeSMPSession :: AgentMonad' m => AgentClient -> SMPTransportSession -> m ()
|
||||
resubscribeSMPSession c@AgentClient {smpSubWorkers} tSess =
|
||||
atomically (getTSessVar tSess smpSubWorkers) >>= either newSubWorker (\_ -> pure ())
|
||||
where
|
||||
tryReconnectSMPClient aId = do
|
||||
newSubWorker v = do
|
||||
subWorkerId <- atomically $ stateTVar (workerSeq c) $ \next -> (next, next + 1)
|
||||
subWorkerAsync <- async $ runSubWorker subWorkerId `E.catchAny` const (atomically $ cleanup subWorkerId)
|
||||
atomically $ putTMVar v SubWorker {subWorkerId, subWorkerAsync}
|
||||
runSubWorker swId = do
|
||||
ri <- asks $ reconnectInterval . config
|
||||
timeoutCounts <- newTVarIO 0
|
||||
withRetryIntervalCount ri $ \n _ loop ->
|
||||
reconnectSMPClient n timeoutCounts c tSess `catchAgentError` const loop
|
||||
atomically . removeAsyncAction aId $ reconnections c
|
||||
withRetryInterval ri $ \_ loop -> do
|
||||
pending <- atomically $ do
|
||||
qs <- RQ.getSessQueues tSess (pendingSubs c)
|
||||
when (null qs) $ cleanup swId
|
||||
pure qs
|
||||
forM_ (L.nonEmpty pending) $ \qs -> do
|
||||
void . runExceptT $ reconnectSMPClient timeoutCounts c tSess qs `catchAgentError` \_ -> pure ()
|
||||
loop
|
||||
cleanup :: Int -> STM ()
|
||||
cleanup swId = removeTSessVar ((swId ==) . subWorkerId) tSess smpSubWorkers
|
||||
|
||||
reconnectSMPClient :: forall m. AgentMonad m => Int -> TVar Int -> AgentClient -> SMPTransportSession -> m ()
|
||||
reconnectSMPClient n tc c tSess@(_, srv, _) = do
|
||||
ts <- liftIO getCurrentTime
|
||||
let label = unwords ["reconnect", show n, show ts]
|
||||
withLockMap_ (reconnectLocks c) tSess label $ do
|
||||
qs <- atomically (RQ.getSessQueues tSess $ pendingSubs c)
|
||||
NetworkConfig {tcpTimeout} <- readTVarIO $ useNetworkConfig c
|
||||
-- this allows 3x of timeout per batch of subscription (90 queues per batch empirically)
|
||||
let t = (length qs `div` 90 + 1) * tcpTimeout * 3
|
||||
t `timeout` mapM_ resubscribe (L.nonEmpty qs) >>= \case
|
||||
Just _ -> atomically $ writeTVar tc 0
|
||||
Nothing -> do
|
||||
tc' <- atomically $ stateTVar tc $ \i -> (i + 1, i + 1)
|
||||
maxTC <- asks $ maxSubscriptionTimeouts . config
|
||||
let err = if tc' >= maxTC then CRITICAL True else INTERNAL
|
||||
msg = show tc' <> " consecutive subscription timeouts: " <> show (length qs) <> " queues, transport session: " <> show tSess
|
||||
atomically $ writeTBQueue (subQ c) ("", "", APC SAEConn $ ERR $ err msg)
|
||||
reconnectSMPClient :: forall m. AgentMonad m => TVar Int -> AgentClient -> SMPTransportSession -> NonEmpty RcvQueue -> m ()
|
||||
reconnectSMPClient tc c tSess@(_, srv, _) qs = do
|
||||
NetworkConfig {tcpTimeout} <- readTVarIO $ useNetworkConfig c
|
||||
-- this allows 3x of timeout per batch of subscription (90 queues per batch empirically)
|
||||
let t = (length qs `div` 90 + 1) * tcpTimeout * 3
|
||||
t `timeout` resubscribe >>= \case
|
||||
Just _ -> atomically $ writeTVar tc 0
|
||||
Nothing -> do
|
||||
tc' <- atomically $ stateTVar tc $ \i -> (i + 1, i + 1)
|
||||
maxTC <- asks $ maxSubscriptionTimeouts . config
|
||||
let err = if tc' >= maxTC then CRITICAL True else INTERNAL
|
||||
msg = show tc' <> " consecutive subscription timeouts: " <> show (length qs) <> " queues, transport session: " <> show tSess
|
||||
atomically $ writeTBQueue (subQ c) ("", "", APC SAEConn $ ERR $ err msg)
|
||||
where
|
||||
resubscribe :: NonEmpty RcvQueue -> m ()
|
||||
resubscribe qs = do
|
||||
resubscribe :: m ()
|
||||
resubscribe = do
|
||||
cs <- atomically . RQ.getConns $ activeSubs c
|
||||
rs <- subscribeQueues c $ L.toList qs
|
||||
let (errs, okConns) = partitionEithers $ map (\(RcvQueue {connId}, r) -> bimap (connId,) (const connId) r) rs
|
||||
@@ -572,14 +583,17 @@ reconnectSMPClient n tc c tSess@(_, srv, _) = do
|
||||
unless (null conns) $ notifySub "" $ UP srv conns
|
||||
let (tempErrs, finalErrs) = partition (temporaryAgentError . snd) errs
|
||||
liftIO $ mapM_ (\(connId, e) -> notifySub connId $ ERR e) finalErrs
|
||||
mapM_ (throwError . snd) $ listToMaybe tempErrs
|
||||
forM_ (listToMaybe tempErrs) $ \(_, err) -> do
|
||||
when (null okConns && S.null cs && null finalErrs) . liftIO $
|
||||
closeClient c smpClients tSess
|
||||
throwError err
|
||||
notifySub :: forall e. AEntityI e => ConnId -> ACommand 'Agent e -> IO ()
|
||||
notifySub connId cmd = atomically $ writeTBQueue (subQ c) ("", connId, APC (sAEntity @e) cmd)
|
||||
|
||||
getNtfServerClient :: forall m. AgentMonad m => AgentClient -> NtfTransportSession -> m NtfClient
|
||||
getNtfServerClient c@AgentClient {active, ntfClients} tSess@(userId, srv, _) = do
|
||||
unlessM (readTVarIO active) . throwError $ INACTIVE
|
||||
atomically (getClientVar tSess ntfClients)
|
||||
atomically (getTSessVar tSess ntfClients)
|
||||
>>= either
|
||||
(newProtocolClient c tSess ntfClients connectClient $ \_ _ -> pure ())
|
||||
(waitForProtocolClient c tSess)
|
||||
@@ -591,7 +605,7 @@ getNtfServerClient c@AgentClient {active, ntfClients} tSess@(userId, srv, _) = d
|
||||
|
||||
clientDisconnected :: NtfClient -> IO ()
|
||||
clientDisconnected client = do
|
||||
atomically $ TM.delete tSess ntfClients
|
||||
atomically $ removeClientVar client tSess ntfClients
|
||||
incClientStat c userId client "DISCONNECT" ""
|
||||
atomically $ writeTBQueue (subQ c) ("", "", APC SAENone $ hostEvent DISCONNECT client)
|
||||
logInfo . decodeUtf8 $ "Agent disconnected from " <> showServer srv
|
||||
@@ -599,7 +613,7 @@ getNtfServerClient c@AgentClient {active, ntfClients} tSess@(userId, srv, _) = d
|
||||
getXFTPServerClient :: forall m. AgentMonad m => AgentClient -> XFTPTransportSession -> m XFTPClient
|
||||
getXFTPServerClient c@AgentClient {active, xftpClients, useNetworkConfig} tSess@(userId, srv, _) = do
|
||||
unlessM (readTVarIO active) . throwError $ INACTIVE
|
||||
atomically (getClientVar tSess xftpClients)
|
||||
atomically (getTSessVar tSess xftpClients)
|
||||
>>= either
|
||||
(newProtocolClient c tSess xftpClients connectClient $ \_ _ -> pure ())
|
||||
(waitForProtocolClient c tSess)
|
||||
@@ -612,13 +626,13 @@ getXFTPServerClient c@AgentClient {active, xftpClients, useNetworkConfig} tSess@
|
||||
|
||||
clientDisconnected :: XFTPClient -> IO ()
|
||||
clientDisconnected client = do
|
||||
atomically $ TM.delete tSess xftpClients
|
||||
atomically $ removeClientVar client tSess xftpClients
|
||||
incClientStat c userId client "DISCONNECT" ""
|
||||
atomically $ writeTBQueue (subQ c) ("", "", APC SAENone $ hostEvent DISCONNECT client)
|
||||
logInfo . decodeUtf8 $ "Agent disconnected from " <> showServer srv
|
||||
|
||||
getClientVar :: forall a s. TransportSession s -> TMap (TransportSession s) (TMVar a) -> STM (Either (TMVar a) (TMVar a))
|
||||
getClientVar tSess clients = maybe (Left <$> newClientVar) (pure . Right) =<< TM.lookup tSess clients
|
||||
getTSessVar :: forall a s. TransportSession s -> TMap (TransportSession s) (TMVar a) -> STM (Either (TMVar a) (TMVar a))
|
||||
getTSessVar tSess clients = maybe (Left <$> newClientVar) (pure . Right) =<< TM.lookup tSess clients
|
||||
where
|
||||
newClientVar :: STM (TMVar a)
|
||||
newClientVar = do
|
||||
@@ -626,6 +640,18 @@ getClientVar tSess clients = maybe (Left <$> newClientVar) (pure . Right) =<< TM
|
||||
TM.insert tSess var clients
|
||||
pure var
|
||||
|
||||
removeClientVar :: ProtocolServerClient err msg => Client msg -> TransportSession msg -> TMap (TransportSession msg) (ClientVar msg) -> STM ()
|
||||
removeClientVar = removeTSessVar . either (const False) . sameClient
|
||||
|
||||
sameClient :: ProtocolServerClient err msg => Client msg -> Client msg -> Bool
|
||||
sameClient c c' = clientSessionId c == clientSessionId c'
|
||||
|
||||
removeTSessVar :: (a -> Bool) -> TransportSession msg -> TMap (TransportSession msg) (TMVar a) -> STM ()
|
||||
removeTSessVar same tSess vs =
|
||||
TM.lookup tSess vs
|
||||
$>>= tryReadTMVar
|
||||
>>= mapM_ (\v -> when (same v) $ TM.delete tSess vs)
|
||||
|
||||
waitForProtocolClient :: (AgentMonad m, ProtocolTypeI (ProtoType msg)) => AgentClient -> TransportSession msg -> ClientVar msg -> m (Client msg)
|
||||
waitForProtocolClient c (_, srv, _) clientVar = do
|
||||
NetworkConfig {tcpConnectTimeout} <- readTVarIO $ useNetworkConfig c
|
||||
@@ -635,6 +661,7 @@ waitForProtocolClient c (_, srv, _) clientVar = do
|
||||
Just (Left e) -> Left e
|
||||
Nothing -> Left $ BROKER (B.unpack $ strEncode srv) TIMEOUT
|
||||
|
||||
-- clientConnected arg is only passed for SMP server
|
||||
newProtocolClient ::
|
||||
forall err msg m.
|
||||
(AgentMonad m, ProtocolTypeI (ProtoType msg), ProtocolServerClient err msg) =>
|
||||
@@ -645,7 +672,7 @@ newProtocolClient ::
|
||||
(AgentClient -> TransportSession msg -> m ()) ->
|
||||
ClientVar msg ->
|
||||
m (Client msg)
|
||||
newProtocolClient c tSess@(userId, srv, entityId_) clients connectClient reconnectClient clientVar = tryConnectClient pure tryConnectAsync
|
||||
newProtocolClient c tSess@(userId, srv, entityId_) clients connectClient clientConnected clientVar = tryConnectClient pure tryConnectAsync
|
||||
where
|
||||
tryConnectClient :: (Client msg -> m a) -> m () -> m a
|
||||
tryConnectClient successAction retryAction =
|
||||
@@ -669,7 +696,7 @@ newProtocolClient c tSess@(userId, srv, entityId_) clients connectClient reconne
|
||||
connectAsync :: Int -> m ()
|
||||
connectAsync aId = do
|
||||
ri <- asks $ reconnectInterval . config
|
||||
withRetryInterval ri $ \_ loop -> void $ tryConnectClient (const $ reconnectClient c tSess) loop
|
||||
withRetryInterval ri $ \_ loop -> void $ tryConnectClient (const $ clientConnected c tSess) loop
|
||||
atomically . removeAsyncAction aId $ asyncClients c
|
||||
|
||||
hostEvent :: forall err msg. (ProtocolTypeI (ProtoType msg), ProtocolServerClient err msg) => (AProtocolType -> TransportHost -> ACommand 'Agent 'AENone) -> Client msg -> ACommand 'Agent 'AENone
|
||||
@@ -687,7 +714,7 @@ closeAgentClient c = liftIO $ do
|
||||
closeProtocolServerClients c smpClients
|
||||
closeProtocolServerClients c ntfClients
|
||||
closeProtocolServerClients c xftpClients
|
||||
cancelActions . actions $ reconnections c
|
||||
atomically (swapTVar (smpSubWorkers c) M.empty) >>= mapM_ cancelReconnect
|
||||
cancelActions . actions $ asyncClients c
|
||||
clearWorkers smpDeliveryWorkers >>= mapM_ (cancelWorker . fst)
|
||||
clearWorkers asyncCmdWorkers >>= mapM_ cancelWorker
|
||||
@@ -701,6 +728,8 @@ closeAgentClient c = liftIO $ do
|
||||
clearWorkers workers = atomically $ swapTVar (workers c) mempty
|
||||
clear :: Monoid m => (AgentClient -> TVar m) -> IO ()
|
||||
clear sel = atomically $ writeTVar (sel c) mempty
|
||||
cancelReconnect :: TMVar SubWorker -> IO ()
|
||||
cancelReconnect v = void . forkIO $ atomically (readTMVar v) >>= \(SubWorker _ a) -> uninterruptibleCancel a
|
||||
|
||||
cancelWorker :: Worker -> IO ()
|
||||
cancelWorker Worker {doWork, action} = do
|
||||
@@ -991,7 +1020,7 @@ temporaryOrHostError = \case
|
||||
e -> temporaryAgentError e
|
||||
|
||||
-- | Subscribe to queues. The list of results can have a different order.
|
||||
subscribeQueues :: forall m. AgentMonad m => AgentClient -> [RcvQueue] -> m [(RcvQueue, Either AgentErrorType ())]
|
||||
subscribeQueues :: forall m. AgentMonad' m => AgentClient -> [RcvQueue] -> m [(RcvQueue, Either AgentErrorType ())]
|
||||
subscribeQueues c qs = do
|
||||
(errs, qs') <- partitionEithers <$> mapM checkQueue qs
|
||||
forM_ qs' $ \rq@RcvQueue {connId} -> atomically $ do
|
||||
@@ -1009,13 +1038,13 @@ subscribeQueues c qs = do
|
||||
rs <- sendBatch subscribeSMPQueues smp qs'
|
||||
mapM_ (uncurry $ processSubResult c) rs
|
||||
when (any temporaryClientError . lefts . map snd $ L.toList rs) . unliftIO u $
|
||||
reconnectServer c (transportSession' smp)
|
||||
resubscribeSMPSession c (transportSession' smp)
|
||||
pure rs
|
||||
|
||||
type BatchResponses e r = (NonEmpty (RcvQueue, Either e r))
|
||||
|
||||
-- statBatchSize is not used to batch the commands, only for traffic statistics
|
||||
sendTSessionBatches :: forall m q r. AgentMonad m => ByteString -> Int -> (q -> RcvQueue) -> (SMPClient -> NonEmpty q -> IO (BatchResponses SMPClientError r)) -> AgentClient -> [q] -> m [(RcvQueue, Either AgentErrorType r)]
|
||||
sendTSessionBatches :: forall m q r. AgentMonad' m => ByteString -> Int -> (q -> RcvQueue) -> (SMPClient -> NonEmpty q -> IO (BatchResponses SMPClientError r)) -> AgentClient -> [q] -> m [(RcvQueue, Either AgentErrorType r)]
|
||||
sendTSessionBatches statCmd statBatchSize toRQ action c qs =
|
||||
concatMap L.toList <$> (mapConcurrently sendClientBatch =<< batchQueues)
|
||||
where
|
||||
@@ -1029,7 +1058,7 @@ sendTSessionBatches statCmd statBatchSize toRQ action c qs =
|
||||
in M.alter (Just . maybe [q] (q <|)) tSess m
|
||||
sendClientBatch :: (SMPTransportSession, NonEmpty q) -> m (BatchResponses AgentErrorType r)
|
||||
sendClientBatch (tSess@(userId, srv, _), qs') =
|
||||
tryError (getSMPServerClient c tSess) >>= \case
|
||||
runExceptT (getSMPServerClient c tSess) >>= \case
|
||||
Left e -> pure $ L.map ((,Left e) . toRQ) qs'
|
||||
Right smp -> liftIO $ do
|
||||
logServer "-->" c srv (bshow (length qs') <> " queues") statCmd
|
||||
@@ -1130,7 +1159,7 @@ enableQueueNotifications c rq@RcvQueue {rcvId, rcvPrivateKey} notifierKey rcvNtf
|
||||
withSMPClient c rq "NKEY <nkey>" $ \smp ->
|
||||
enableSMPQueueNotifications smp rcvPrivateKey rcvId notifierKey rcvNtfPublicDhKey
|
||||
|
||||
enableQueuesNtfs :: forall m. AgentMonad m => AgentClient -> [(RcvQueue, SMP.NtfPublicVerifyKey, SMP.RcvNtfPublicDhKey)] -> m [(RcvQueue, Either AgentErrorType (SMP.NotifierId, SMP.RcvNtfPublicDhKey))]
|
||||
enableQueuesNtfs :: forall m. AgentMonad' m => AgentClient -> [(RcvQueue, SMP.NtfPublicVerifyKey, SMP.RcvNtfPublicDhKey)] -> m [(RcvQueue, Either AgentErrorType (SMP.NotifierId, SMP.RcvNtfPublicDhKey))]
|
||||
enableQueuesNtfs = sendTSessionBatches "NKEY" 90 fst3 enableQueues_
|
||||
where
|
||||
fst3 (x, _, _) = x
|
||||
@@ -1144,7 +1173,7 @@ disableQueueNotifications c rq@RcvQueue {rcvId, rcvPrivateKey} =
|
||||
withSMPClient c rq "NDEL" $ \smp ->
|
||||
disableSMPQueueNotifications smp rcvPrivateKey rcvId
|
||||
|
||||
disableQueuesNtfs :: forall m. AgentMonad m => AgentClient -> [RcvQueue] -> m [(RcvQueue, Either AgentErrorType ())]
|
||||
disableQueuesNtfs :: forall m. AgentMonad' m => AgentClient -> [RcvQueue] -> m [(RcvQueue, Either AgentErrorType ())]
|
||||
disableQueuesNtfs = sendTSessionBatches "NDEL" 90 id $ sendBatch disableSMPQueuesNtfs
|
||||
|
||||
sendAck :: AgentMonad m => AgentClient -> RcvQueue -> MsgId -> m ()
|
||||
@@ -1171,7 +1200,7 @@ deleteQueue c rq@RcvQueue {rcvId, rcvPrivateKey} = do
|
||||
withSMPClient c rq "DEL" $ \smp ->
|
||||
deleteSMPQueue smp rcvPrivateKey rcvId
|
||||
|
||||
deleteQueues :: forall m. AgentMonad m => AgentClient -> [RcvQueue] -> m [(RcvQueue, Either AgentErrorType ())]
|
||||
deleteQueues :: forall m. AgentMonad' m => AgentClient -> [RcvQueue] -> m [(RcvQueue, Either AgentErrorType ())]
|
||||
deleteQueues = sendTSessionBatches "DEL" 90 id $ sendBatch deleteSMPQueues
|
||||
|
||||
sendAgentMessage :: AgentMonad m => AgentClient -> SndQueue -> MsgFlags -> ByteString -> m ()
|
||||
|
||||
@@ -788,7 +788,7 @@ testDuplicateMessage t = do
|
||||
-- commenting two lines below and uncommenting further two lines would also runRight_,
|
||||
-- it is the scenario tested above, when the message was not acknowledged by the user
|
||||
threadDelay 200000
|
||||
Left (BROKER _ TIMEOUT) <- runExceptT $ ackMessage bob1 aliceId 5 Nothing
|
||||
Left (BROKER _ NETWORK) <- runExceptT $ ackMessage bob1 aliceId 5 Nothing
|
||||
|
||||
disconnectAgentClient alice
|
||||
disconnectAgentClient bob1
|
||||
|
||||
Reference in New Issue
Block a user