Merge branch 'stable'

This commit is contained in:
Evgeny Poberezkin
2024-05-21 23:42:49 +01:00
5 changed files with 45 additions and 53 deletions
+7
View File
@@ -1,3 +1,10 @@
# 5.7.4
SMP agent:
- remove re-subscription timeouts (as they are tracked per operation, and could cause failed subscriptions).
- reconnect XFTP clients when network settings changes.
- fix lock contention resulting in stuck subscriptions on network change.
# 5.7.3
SMP/NTF protocol:
+21 -37
View File
@@ -702,12 +702,11 @@ resubscribeSMPSession c@AgentClient {smpSubWorkers, workerSeq} tSess =
atomically $ putTMVar (sessionVar v) a
runSubWorker = do
ri <- asks $ reconnectInterval . config
timeoutCounts <- newTVarIO 0
withRetryInterval ri $ \_ loop -> do
pending <- atomically getPending
forM_ (L.nonEmpty pending) $ \qs -> do
lift $ waitForUserNetwork c
void . tryAgentError' $ reconnectSMPClient timeoutCounts c tSess qs
liftIO $ waitForUserNetwork c
reconnectSMPClient c tSess qs
loop
getPending = RQ.getSessQueues tSess $ pendingSubs c
cleanup :: SessionVar (Async ()) -> STM ()
@@ -717,41 +716,26 @@ resubscribeSMPSession c@AgentClient {smpSubWorkers, workerSeq} tSess =
whenM (isEmptyTMVar $ sessionVar v) retry
removeSessVar v tSess smpSubWorkers
reconnectSMPClient :: TVar Int -> AgentClient -> SMPTransportSession -> NonEmpty RcvQueue -> AM ()
reconnectSMPClient tc c tSess@(_, srv, _) qs = do
NetworkConfig {tcpTimeout} <- atomically $ getNetworkConfig c
-- this allows 3x of timeout per batch of subscription (90 queues per batch empirically)
let t = (length qs `div` 90 + 1) * tcpTimeout * 3
ExceptT (sequence <$> (t `timeout` runExceptT resubscribe)) >>= \case
Just _ -> resetTimeouts
-- reset and do not report consecutive timeouts while offline
Nothing -> ifM (atomically $ isNetworkOnline c) notifyTimeout resetTimeouts
reconnectSMPClient :: AgentClient -> SMPTransportSession -> NonEmpty RcvQueue -> AM' ()
reconnectSMPClient c tSess@(_, srv, _) qs = handleNotify $ do
cs <- readTVarIO $ RQ.getConnections $ activeSubs c
(rs, sessId_) <- subscribeQueues c $ L.toList qs
let (errs, okConns) = partitionEithers $ map (\(RcvQueue {connId}, r) -> bimap (connId,) (const connId) r) rs
conns = filter (`M.notMember` cs) okConns
unless (null conns) $ notifySub "" $ UP srv conns
let (tempErrs, finalErrs) = partition (temporaryAgentError . snd) errs
mapM_ (\(connId, e) -> notifySub connId $ ERR e) finalErrs
forM_ (listToMaybe tempErrs) $ \(connId, e) -> do
when (null okConns && M.null cs && null finalErrs) . liftIO $
forM_ sessId_ $ \sessId -> do
-- We only close the client session that was used to subscribe.
v_ <- atomically $ ifM (activeClientSession c tSess sessId) (TM.lookupDelete tSess $ smpClients c) (pure Nothing)
mapM_ (closeClient_ c) v_
notifySub connId $ ERR e
where
resetTimeouts = atomically $ writeTVar tc 0
notifyTimeout = do
tc' <- atomically $ stateTVar tc $ \i -> (i + 1, i + 1)
maxTC <- asks $ maxSubscriptionTimeouts . config
when (tc' >= maxTC) $ do
let msg = show tc' <> " consecutive subscription timeouts: " <> show (length qs) <> " queues, transport session: " <> show tSess
atomically $ writeTBQueue (subQ c) ("", "", APC SAEConn $ ERR $ INTERNAL msg)
resubscribe :: AM ()
resubscribe = do
cs <- readTVarIO $ RQ.getConnections $ activeSubs c
(rs, sessId_) <- lift . subscribeQueues c $ L.toList qs
let (errs, okConns) = partitionEithers $ map (\(RcvQueue {connId}, r) -> bimap (connId,) (const connId) r) rs
liftIO $ do
let conns = filter (`M.notMember` cs) okConns
unless (null conns) $ notifySub "" $ UP srv conns
let (tempErrs, finalErrs) = partition (temporaryAgentError . snd) errs
liftIO $ mapM_ (\(connId, e) -> notifySub connId $ ERR e) finalErrs
forM_ (listToMaybe tempErrs) $ \(_, err) -> do
when (null okConns && M.null cs && null finalErrs) . liftIO $
forM_ sessId_ $ \sessId -> do
-- We only close the client session that was used to subscribe.
v_ <- atomically $ ifM (activeClientSession c tSess sessId) (TM.lookupDelete tSess $ smpClients c) (pure Nothing)
mapM_ (closeClient_ c) v_
throwError err
notifySub :: forall e. AEntityI e => ConnId -> ACommand 'Agent e -> IO ()
handleNotify :: AM' () -> AM' ()
handleNotify = E.handleAny $ notifySub "" . ERR . INTERNAL . show
notifySub :: forall e. AEntityI e => ConnId -> ACommand 'Agent e -> AM' ()
notifySub connId cmd = atomically $ writeTBQueue (subQ c) ("", connId, APC (sAEntity @e) cmd)
getNtfServerClient :: AgentClient -> NtfTransportSession -> AM NtfClient
@@ -102,7 +102,6 @@ data AgentConfig = AgentConfig
cleanupInterval :: Int64,
cleanupStepInterval :: Int,
maxWorkerRestartsPerMin :: Int,
maxSubscriptionTimeouts :: Int,
storedMsgDataTTL :: NominalDiffTime,
rcvFilesTTL :: NominalDiffTime,
sndFilesTTL :: NominalDiffTime,
@@ -173,9 +172,6 @@ defaultAgentConfig =
cleanupInterval = 30 * 60 * 1000000, -- 30 minutes
cleanupStepInterval = 200000, -- 200ms
maxWorkerRestartsPerMin = 5,
-- 5 consecutive subscription timeouts will result in alert to the user
-- this is a fallback, as the timeout set to 3x of expected timeout, to avoid potential locking.
maxSubscriptionTimeouts = 5,
storedMsgDataTTL = 21 * nominalDay,
rcvFilesTTL = 2 * nominalDay,
sndFilesTTL = nominalDay,
+16 -12
View File
@@ -95,23 +95,24 @@ type AEntityTransmission p e = (ACorrId, ConnId, ACommand p e)
type AEntityTransmissionOrError p e = (ACorrId, ConnId, Either AgentErrorType (ACommand p e))
tGetAgent :: Transport c => c -> IO (AEntityTransmissionOrError 'Agent 'AEConn)
tGetAgent = tGetAgent'
tGetAgent = tGetAgent' True
tGetAgent' :: forall c e. (Transport c, AEntityI e) => c -> IO (AEntityTransmissionOrError 'Agent e)
tGetAgent' h = do
(corrId, connId, cmdOrErr) <- pGetAgent h
tGetAgent' :: forall c e. (Transport c, AEntityI e) => Bool -> c -> IO (AEntityTransmissionOrError 'Agent e)
tGetAgent' skipErr h = do
(corrId, connId, cmdOrErr) <- pGetAgent skipErr h
case cmdOrErr of
Right (APC e cmd) -> case testEquality e (sAEntity @e) of
Just Refl -> pure (corrId, connId, Right cmd)
_ -> error $ "unexpected command " <> show cmd
Left err -> pure (corrId, connId, Left err)
pGetAgent :: forall c. Transport c => c -> IO (ATransmissionOrError 'Agent)
pGetAgent h = do
pGetAgent :: forall c. Transport c => Bool -> c -> IO (ATransmissionOrError 'Agent)
pGetAgent skipErr h = do
(corrId, connId, cmdOrErr) <- tGet SAgent h
case cmdOrErr of
Right (APC _ CONNECT {}) -> pGetAgent h
Right (APC _ DISCONNECT {}) -> pGetAgent h
Right (APC _ CONNECT {}) -> pGetAgent skipErr h
Right (APC _ DISCONNECT {}) -> pGetAgent skipErr h
Right (APC _ (ERR (BROKER _ NETWORK))) | skipErr -> pGetAgent skipErr h
cmd -> pure (corrId, connId, cmd)
-- | receive message to handle `h`
@@ -119,15 +120,18 @@ pGetAgent h = do
(<#:) = tGetAgent
(<#:?) :: Transport c => c -> IO (ATransmissionOrError 'Agent)
(<#:?) = pGetAgent
(<#:?) = pGetAgent True
(<#:.) :: Transport c => c -> IO (AEntityTransmissionOrError 'Agent 'AENone)
(<#:.) = tGetAgent'
(<#:.) = tGetAgent' True
-- | send transmission `t` to handle `h` and get response
(#:) :: Transport c => c -> (ByteString, ByteString, ByteString) -> IO (AEntityTransmissionOrError 'Agent 'AEConn)
h #: t = tPutRaw h t >> (<#:) h
(#:!) :: Transport c => c -> (ByteString, ByteString, ByteString) -> IO (AEntityTransmissionOrError 'Agent 'AEConn)
h #:! t = tPutRaw h t >> tGetAgent' False h
-- | action and expected response
-- `h #:t #> r` is the test that sends `t` to `h` and validates that the response is `r`
(#>) :: IO (AEntityTransmissionOrError 'Agent 'AEConn) -> AEntityTransmission 'Agent 'AEConn -> Expectation
@@ -426,8 +430,8 @@ testServerConnectionAfterError t _ = do
withAgent1 $ \bob -> do
withAgent2 $ \alice -> do
bob #: ("1", "alice", "SUB") =#> \("1", "alice", ERR (BROKER _ e)) -> e == NETWORK || e == TIMEOUT
alice #: ("1", "bob", "SUB") =#> \("1", "bob", ERR (BROKER _ e)) -> e == NETWORK || e == TIMEOUT
bob #:! ("1", "alice", "SUB") =#> \("1", "alice", ERR (BROKER _ e)) -> e == NETWORK || e == TIMEOUT
alice #:! ("1", "bob", "SUB") =#> \("1", "bob", ERR (BROKER _ e)) -> e == NETWORK || e == TIMEOUT
withServer $ do
alice <#=? \case ("", "bob", APC SAEConn (SENT 4)) -> True; ("", "", APC _ (UP s ["bob"])) -> s == server; _ -> False
alice <#=? \case ("", "bob", APC SAEConn (SENT 4)) -> True; ("", "", APC _ (UP s ["bob"])) -> s == server; _ -> False
+1
View File
@@ -145,6 +145,7 @@ pGet c = do
case cmd of
CONNECT {} -> pGet c
DISCONNECT {} -> pGet c
ERR (BROKER _ NETWORK) -> pGet c
_ -> pure t
pattern CONF :: ConfirmationId -> [SMPServer] -> ConnInfo -> ACommand 'Agent e