Merge branch 'master' into xftp

This commit is contained in:
spacedandy
2023-03-03 19:16:12 +04:00
7 changed files with 58 additions and 27 deletions
+2 -2
View File
@@ -896,7 +896,7 @@ runCommandProcessing c@AgentClient {subQ} server_ = do
withStore c (`getConn` connId) >>= \case
SomeConn _ conn@DuplexConnection {} -> a conn
_ -> internalErr "command requires duplex connection"
tryCommand action = withRetryInterval ri $ \loop ->
tryCommand action = withRetryInterval ri $ \_ loop ->
tryError action >>= \case
Left e
| temporaryOrHostError e -> retrySndOp c loop
@@ -1002,7 +1002,7 @@ runSmpQueueMsgDelivery c@AgentClient {subQ} cData@ConnData {userId, connId, dupl
Left (e :: E.SomeException) ->
notify $ MERR mId (INTERNAL $ show e)
Right (rq_, PendingMsgData {msgType, msgBody, msgFlags, internalTs}) ->
withRetryLock2 ri qLock $ \loop -> do
withRetryLock2 ri qLock $ \_ loop -> do
resp <- tryError $ case msgType of
AM_CONN_INFO -> sendConfirmation c sq msgBody
_ -> sendAgentMessage c sq msgFlags msgBody
+2 -2
View File
@@ -421,7 +421,7 @@ reconnectServer c tSess = newAsyncAction tryReconnectSMPClient $ reconnections c
where
tryReconnectSMPClient aId = do
ri <- asks $ reconnectInterval . config
withRetryInterval ri $ \loop ->
withRetryInterval ri $ \_ loop ->
reconnectSMPClient c tSess `catchError` const loop
atomically . removeAsyncAction aId $ reconnections c
@@ -537,7 +537,7 @@ newProtocolClient c tSess@(userId, srv, entityId_) clients connectClient reconne
connectAsync :: Int -> m ()
connectAsync aId = do
ri <- asks $ reconnectInterval . config
withRetryInterval ri $ \loop -> void $ tryConnectClient (const $ reconnectClient c tSess) loop
withRetryInterval ri $ \_ loop -> void $ tryConnectClient (const $ reconnectClient c tSess) loop
atomically . removeAsyncAction aId $ asyncClients c
hostEvent :: forall err msg. (ProtocolTypeI (ProtoType msg), ProtocolServerClient err msg) => (AProtocolType -> TransportHost -> ACommand 'Agent) -> Client msg -> ACommand 'Agent
@@ -175,7 +175,7 @@ runNtfWorker c srv doWork = do
Nothing -> noWorkToDo
Just a@(NtfSubscription {connId}, _, _) -> do
ri <- asks $ reconnectInterval . config
withRetryInterval ri $ \loop ->
withRetryInterval ri $ \_ loop ->
processAction a
`catchError` retryOnError c "NtfWorker" loop (workerInternalError c connId . show)
noWorkToDo = void . atomically $ tryTakeTMVar doWork
@@ -259,7 +259,7 @@ runNtfSMPWorker c srv doWork = do
Nothing -> noWorkToDo
Just a@(NtfSubscription {connId}, _, _) -> do
ri <- asks $ reconnectInterval . config
withRetryInterval ri $ \loop ->
withRetryInterval ri $ \_ loop ->
processAction a
`catchError` retryOnError c "NtfSMPWorker" loop (workerInternalError c connId . show)
noWorkToDo = void . atomically $ tryTakeTMVar doWork
+14 -11
View File
@@ -29,13 +29,13 @@ data RetryInterval2 = RetryInterval2
}
data RetryIntervalMode = RISlow | RIFast
deriving (Eq)
deriving (Eq, Show)
withRetryInterval :: forall m. MonadIO m => RetryInterval -> (m () -> m ()) -> m ()
withRetryInterval :: forall m. MonadIO m => RetryInterval -> (Int -> m () -> m ()) -> m ()
withRetryInterval ri action = callAction 0 $ initialInterval ri
where
callAction :: Int -> Int -> m ()
callAction elapsed delay = action loop
callAction elapsed delay = action delay loop
where
loop = do
liftIO $ threadDelay delay
@@ -43,20 +43,23 @@ withRetryInterval ri action = callAction 0 $ initialInterval ri
callAction elapsed' $ nextDelay elapsed' delay ri
-- This function allows action to toggle between slow and fast retry intervals.
withRetryLock2 :: forall m. MonadIO m => RetryInterval2 -> TMVar () -> ((RetryIntervalMode -> m ()) -> m ()) -> m ()
withRetryLock2 :: forall m. MonadIO m => RetryInterval2 -> TMVar () -> ((RetryIntervalMode, Int) -> (RetryIntervalMode -> m ()) -> m ()) -> m ()
withRetryLock2 RetryInterval2 {riSlow, riFast} lock action =
callAction (0, initialInterval riSlow) (0, initialInterval riFast)
callAction (RIFast, 0) (0, initialInterval riSlow) (0, initialInterval riFast)
where
callAction :: (Int, Int) -> (Int, Int) -> m ()
callAction slow fast = action loop
callAction :: (RetryIntervalMode, Int) -> (Int, Int) -> (Int, Int) -> m ()
callAction retryState slow fast = action retryState loop
where
loop = \case
RISlow -> run slow riSlow (`callAction` fast)
RIFast -> run fast riFast (callAction slow)
loop mode = case mode of
RISlow -> run slow riSlow (\ri -> callAction (state ri) ri fast)
RIFast -> run fast riFast (\ri -> callAction (state ri) slow ri)
where
state ri = (mode, snd ri)
run (elapsed, delay) ri call = do
wait delay
let elapsed' = elapsed + delay
call (elapsed', nextDelay elapsed' delay ri)
delay' = nextDelay elapsed' delay ri
call (elapsed', delay')
wait delay = do
waiting <- newTVarIO True
_ <- liftIO . forkIO $ do
+2 -2
View File
@@ -156,7 +156,7 @@ getSMPServerClient' ca@SMPClientAgent {agentCfg, smpClients, msgQ} srv =
atomically $ modifyTVar' (asyncClients ca) (a :)
connectAsync :: ExceptT SMPClientError IO ()
connectAsync =
withRetryInterval (reconnectInterval agentCfg) $ \loop ->
withRetryInterval (reconnectInterval agentCfg) $ \_ loop ->
void $ tryConnectClient (const reconnectClient) loop
connectClient :: ExceptT SMPClientError IO SMPClient
@@ -195,7 +195,7 @@ getSMPServerClient' ca@SMPClientAgent {agentCfg, smpClients, msgQ} srv =
tryReconnectClient :: ExceptT SMPClientError IO ()
tryReconnectClient = do
withRetryInterval (reconnectInterval agentCfg) $ \loop ->
withRetryInterval (reconnectInterval agentCfg) $ \_ loop ->
reconnectClient `catchE` const loop
reconnectClient :: ExceptT SMPClientError IO ()