Files
simplexmq/src/Simplex/Messaging/Agent/RetryInterval.hs
T
Evgeny Poberezkin 058e3ac55e send/process "quota exceeded" message from SMP server when sender gets ERR QUOTA (#585)
* send "quota exceeded" message from SMP server when sender gets ERR QUOTA (ignored in the agent for now)

* send msg quota to the recipient to indicate that sender got ERR QUOTA, test

* switch between slow/fast retry intervals (tests do not pass yet)

* send QCONT message, refactor RetryInterval, test

* refactor

* remove comment

* remove space

* unit test for withRetryLock2

* refactor
2023-01-04 14:10:13 +00:00

74 lines
2.3 KiB
Haskell

{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE ScopedTypeVariables #-}
module Simplex.Messaging.Agent.RetryInterval
( RetryInterval (..),
RetryInterval2 (..),
RetryIntervalMode (..),
withRetryInterval,
withRetryLock2,
)
where
import Control.Concurrent (forkIO, threadDelay)
import Control.Monad (void)
import Control.Monad.IO.Class (MonadIO, liftIO)
import Simplex.Messaging.Util (whenM)
import UnliftIO.STM
data RetryInterval = RetryInterval
{ initialInterval :: Int,
increaseAfter :: Int,
maxInterval :: Int
}
data RetryInterval2 = RetryInterval2
{ riSlow :: RetryInterval,
riFast :: RetryInterval
}
data RetryIntervalMode = RISlow | RIFast
deriving (Eq)
withRetryInterval :: forall m. MonadIO m => RetryInterval -> (m () -> m ()) -> m ()
withRetryInterval ri action = callAction 0 $ initialInterval ri
where
callAction :: Int -> Int -> m ()
callAction elapsed delay = action loop
where
loop = do
liftIO $ threadDelay delay
let elapsed' = elapsed + delay
callAction elapsed' $ nextDelay elapsed' delay ri
-- This function allows action to toggle between slow and fast retry intervals.
withRetryLock2 :: forall m. MonadIO m => RetryInterval2 -> TMVar () -> ((RetryIntervalMode -> m ()) -> m ()) -> m ()
withRetryLock2 RetryInterval2 {riSlow, riFast} lock action =
callAction (0, initialInterval riSlow) (0, initialInterval riFast)
where
callAction :: (Int, Int) -> (Int, Int) -> m ()
callAction slow fast = action loop
where
loop = \case
RISlow -> run slow riSlow (`callAction` fast)
RIFast -> run fast riFast (callAction slow)
run (elapsed, delay) ri call = do
wait delay
let elapsed' = elapsed + delay
call (elapsed', nextDelay elapsed' delay ri)
wait delay = do
waiting <- newTVarIO True
_ <- liftIO . forkIO $ do
threadDelay delay
atomically $ whenM (readTVar waiting) $ void $ tryPutTMVar lock ()
atomically $ do
takeTMVar lock
writeTVar waiting False
nextDelay :: Int -> Int -> RetryInterval -> Int
nextDelay elapsed delay RetryInterval {increaseAfter, maxInterval} =
if elapsed < increaseAfter || delay == maxInterval
then delay
else min (delay * 3 `div` 2) maxInterval