mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-05-10 23:37:00 +00:00
store retry intervals to resume from the same interval on restart (#693)
* store retry intervals to resume from the same interval on restart * add migration * updates
This commit is contained in:
committed by
GitHub
parent
7e265e5c59
commit
d41c2bec2a
@@ -78,6 +78,7 @@ library
|
||||
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230120_delete_errors
|
||||
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230217_server_key_hash
|
||||
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230223_files
|
||||
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230320_retry_state
|
||||
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230401_snd_files
|
||||
Simplex.Messaging.Agent.TAsyncs
|
||||
Simplex.Messaging.Agent.TRcvQueues
|
||||
|
||||
@@ -1024,8 +1024,9 @@ runSmpQueueMsgDelivery c@AgentClient {subQ} cData@ConnData {userId, connId, dupl
|
||||
E.try (withStore c $ \db -> getPendingMsgData db connId msgId) >>= \case
|
||||
Left (e :: E.SomeException) ->
|
||||
notify $ MERR mId (INTERNAL $ show e)
|
||||
Right (rq_, PendingMsgData {msgType, msgBody, msgFlags, internalTs}) ->
|
||||
withRetryLock2 ri qLock $ \_ loop -> do
|
||||
Right (rq_, PendingMsgData {msgType, msgBody, msgFlags, msgRetryState, internalTs}) -> do
|
||||
let ri' = maybe id updateRetryInterval2 msgRetryState ri
|
||||
withRetryLock2 ri' qLock $ \riState loop -> do
|
||||
resp <- tryError $ case msgType of
|
||||
AM_CONN_INFO -> sendConfirmation c sq msgBody
|
||||
_ -> sendAgentMessage c sq msgFlags msgBody
|
||||
@@ -1036,7 +1037,7 @@ runSmpQueueMsgDelivery c@AgentClient {subQ} cData@ConnData {userId, connId, dupl
|
||||
SMP SMP.QUOTA -> case msgType of
|
||||
AM_CONN_INFO -> connError msgId NOT_AVAILABLE
|
||||
AM_CONN_INFO_REPLY -> connError msgId NOT_AVAILABLE
|
||||
_ -> retrySndOp c $ loop RISlow
|
||||
_ -> retrySndMsg RISlow
|
||||
SMP SMP.AUTH -> case msgType of
|
||||
AM_CONN_INFO -> connError msgId NOT_AVAILABLE
|
||||
AM_CONN_INFO_REPLY -> connError msgId NOT_AVAILABLE
|
||||
@@ -1045,7 +1046,7 @@ runSmpQueueMsgDelivery c@AgentClient {subQ} cData@ConnData {userId, connId, dupl
|
||||
-- because the queue must be secured by the time the confirmation or the first HELLO is received
|
||||
| duplexHandshake == Just True -> connErr
|
||||
| otherwise ->
|
||||
ifM (msgExpired helloTimeout) connErr (retrySndOp c $ loop RIFast)
|
||||
ifM (msgExpired helloTimeout) connErr (retrySndMsg RIFast)
|
||||
where
|
||||
connErr = case rq_ of
|
||||
-- party initiating connection
|
||||
@@ -1064,13 +1065,16 @@ runSmpQueueMsgDelivery c@AgentClient {subQ} cData@ConnData {userId, connId, dupl
|
||||
-- the message sending would be retried
|
||||
| temporaryOrHostError e -> do
|
||||
let timeoutSel = if msgType == AM_HELLO_ then helloTimeout else messageTimeout
|
||||
ifM (msgExpired timeoutSel) (notifyDel msgId err) (retrySndOp c $ loop RIFast)
|
||||
ifM (msgExpired timeoutSel) (notifyDel msgId err) (retrySndMsg RIFast)
|
||||
| otherwise -> notifyDel msgId err
|
||||
where
|
||||
msgExpired timeoutSel = do
|
||||
msgTimeout <- asks $ timeoutSel . config
|
||||
currentTime <- liftIO getCurrentTime
|
||||
pure $ diffUTCTime currentTime internalTs > msgTimeout
|
||||
retrySndMsg riMode = do
|
||||
withStore' c $ \db -> updatePendingMsgRIState db connId msgId riState
|
||||
retrySndOp c $ loop riMode
|
||||
Right () -> do
|
||||
case msgType of
|
||||
AM_CONN_INFO -> do
|
||||
|
||||
@@ -137,7 +137,7 @@ defaultMessageRetryInterval =
|
||||
RetryInterval
|
||||
{ initialInterval = 60_000000,
|
||||
increaseAfter = 60_000000,
|
||||
maxInterval = 1200_000000 -- 20min
|
||||
maxInterval = 3600_000000 -- 1 hour
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1,13 +1,16 @@
|
||||
{-# LANGUAGE LambdaCase #-}
|
||||
{-# LANGUAGE NamedFieldPuns #-}
|
||||
{-# LANGUAGE OverloadedStrings #-}
|
||||
{-# LANGUAGE ScopedTypeVariables #-}
|
||||
|
||||
module Simplex.Messaging.Agent.RetryInterval
|
||||
( RetryInterval (..),
|
||||
RetryInterval2 (..),
|
||||
RetryIntervalMode (..),
|
||||
RI2State (..),
|
||||
withRetryInterval,
|
||||
withRetryLock2,
|
||||
updateRetryInterval2,
|
||||
)
|
||||
where
|
||||
|
||||
@@ -28,6 +31,19 @@ data RetryInterval2 = RetryInterval2
|
||||
riFast :: RetryInterval
|
||||
}
|
||||
|
||||
data RI2State = RI2State
|
||||
{ slowInterval :: Int,
|
||||
fastInterval :: Int
|
||||
}
|
||||
deriving (Show)
|
||||
|
||||
updateRetryInterval2 :: RI2State -> RetryInterval2 -> RetryInterval2
|
||||
updateRetryInterval2 RI2State {slowInterval, fastInterval} RetryInterval2 {riSlow, riFast} =
|
||||
RetryInterval2
|
||||
{ riSlow = riSlow {initialInterval = slowInterval, increaseAfter = 0},
|
||||
riFast = riFast {initialInterval = fastInterval, increaseAfter = 0}
|
||||
}
|
||||
|
||||
data RetryIntervalMode = RISlow | RIFast
|
||||
deriving (Eq, Show)
|
||||
|
||||
@@ -43,18 +59,16 @@ withRetryInterval ri action = callAction 0 $ initialInterval ri
|
||||
callAction elapsed' $ nextDelay elapsed' delay ri
|
||||
|
||||
-- This function allows action to toggle between slow and fast retry intervals.
|
||||
withRetryLock2 :: forall m. MonadIO m => RetryInterval2 -> TMVar () -> ((RetryIntervalMode, Int) -> (RetryIntervalMode -> m ()) -> m ()) -> m ()
|
||||
withRetryLock2 :: forall m. MonadIO m => RetryInterval2 -> TMVar () -> (RI2State -> (RetryIntervalMode -> m ()) -> m ()) -> m ()
|
||||
withRetryLock2 RetryInterval2 {riSlow, riFast} lock action =
|
||||
callAction (RIFast, 0) (0, initialInterval riSlow) (0, initialInterval riFast)
|
||||
callAction (0, initialInterval riSlow) (0, initialInterval riFast)
|
||||
where
|
||||
callAction :: (RetryIntervalMode, Int) -> (Int, Int) -> (Int, Int) -> m ()
|
||||
callAction retryState slow fast = action retryState loop
|
||||
callAction :: (Int, Int) -> (Int, Int) -> m ()
|
||||
callAction slow fast = action (RI2State (snd slow) (snd fast)) loop
|
||||
where
|
||||
loop mode = case mode of
|
||||
RISlow -> run slow riSlow (\ri -> callAction (state ri) ri fast)
|
||||
RIFast -> run fast riFast (\ri -> callAction (state ri) slow ri)
|
||||
where
|
||||
state ri = (mode, snd ri)
|
||||
loop = \case
|
||||
RISlow -> run slow riSlow (`callAction` fast)
|
||||
RIFast -> run fast riFast (callAction slow)
|
||||
run (elapsed, delay) ri call = do
|
||||
wait delay
|
||||
let elapsed' = elapsed + delay
|
||||
|
||||
@@ -24,6 +24,7 @@ import qualified Data.List.NonEmpty as L
|
||||
import Data.Time (UTCTime)
|
||||
import Data.Type.Equality
|
||||
import Simplex.Messaging.Agent.Protocol
|
||||
import Simplex.Messaging.Agent.RetryInterval (RI2State)
|
||||
import qualified Simplex.Messaging.Crypto as C
|
||||
import Simplex.Messaging.Crypto.Ratchet (RatchetX448)
|
||||
import Simplex.Messaging.Encoding.String
|
||||
@@ -453,6 +454,7 @@ data PendingMsgData = PendingMsgData
|
||||
msgType :: AgentMessageType,
|
||||
msgFlags :: MsgFlags,
|
||||
msgBody :: MsgBody,
|
||||
msgRetryState :: Maybe RI2State,
|
||||
internalTs :: InternalTs
|
||||
}
|
||||
deriving (Show)
|
||||
|
||||
@@ -84,6 +84,7 @@ module Simplex.Messaging.Agent.Store.SQLite
|
||||
createSndMsg,
|
||||
createSndMsgDelivery,
|
||||
getPendingMsgData,
|
||||
updatePendingMsgRIState,
|
||||
getPendingMsgs,
|
||||
deletePendingMsgs,
|
||||
setMsgUserAck,
|
||||
@@ -184,6 +185,7 @@ import Simplex.FileTransfer.Description
|
||||
import Simplex.FileTransfer.Protocol (FileParty (..))
|
||||
import Simplex.FileTransfer.Types
|
||||
import Simplex.Messaging.Agent.Protocol
|
||||
import Simplex.Messaging.Agent.RetryInterval (RI2State (..))
|
||||
import Simplex.Messaging.Agent.Store
|
||||
import Simplex.Messaging.Agent.Store.SQLite.Migrations (Migration)
|
||||
import qualified Simplex.Messaging.Agent.Store.SQLite.Migrations as Migrations
|
||||
@@ -762,16 +764,21 @@ getPendingMsgData db connId msgId = do
|
||||
DB.query
|
||||
db
|
||||
[sql|
|
||||
SELECT m.msg_type, m.msg_flags, m.msg_body, m.internal_ts
|
||||
SELECT m.msg_type, m.msg_flags, m.msg_body, m.internal_ts, s.retry_int_slow, s.retry_int_fast
|
||||
FROM messages m
|
||||
JOIN snd_messages s ON s.conn_id = m.conn_id AND s.internal_id = m.internal_id
|
||||
WHERE m.conn_id = ? AND m.internal_id = ?
|
||||
|]
|
||||
(connId, msgId)
|
||||
pendingMsgData :: (AgentMessageType, Maybe MsgFlags, MsgBody, InternalTs) -> PendingMsgData
|
||||
pendingMsgData (msgType, msgFlags_, msgBody, internalTs) =
|
||||
pendingMsgData :: (AgentMessageType, Maybe MsgFlags, MsgBody, InternalTs, Maybe Int, Maybe Int) -> PendingMsgData
|
||||
pendingMsgData (msgType, msgFlags_, msgBody, internalTs, riSlow_, riFast_) =
|
||||
let msgFlags = fromMaybe SMP.noMsgFlags msgFlags_
|
||||
in PendingMsgData {msgId, msgType, msgFlags, msgBody, internalTs}
|
||||
msgRetryState = RI2State <$> riSlow_ <*> riFast_
|
||||
in PendingMsgData {msgId, msgType, msgFlags, msgBody, msgRetryState, internalTs}
|
||||
|
||||
updatePendingMsgRIState :: DB.Connection -> ConnId -> InternalId -> RI2State -> IO ()
|
||||
updatePendingMsgRIState db connId msgId RI2State {slowInterval, fastInterval} =
|
||||
DB.execute db "UPDATE snd_messages SET retry_int_slow = ?, retry_int_fast = ? WHERE conn_id = ? AND internal_id = ?" (slowInterval, fastInterval, connId, msgId)
|
||||
|
||||
getPendingMsgs :: DB.Connection -> ConnId -> SndQueue -> IO [InternalId]
|
||||
getPendingMsgs db connId SndQueue {dbQueueId} =
|
||||
|
||||
@@ -42,6 +42,7 @@ import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230117_fkey_indexes
|
||||
import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230120_delete_errors
|
||||
import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230217_server_key_hash
|
||||
import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230223_files
|
||||
import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230320_retry_state
|
||||
import Simplex.Messaging.Encoding.String
|
||||
import Simplex.Messaging.Transport.Client (TransportHost)
|
||||
|
||||
@@ -63,7 +64,8 @@ schemaMigrations =
|
||||
("m20230117_fkey_indexes", m20230117_fkey_indexes),
|
||||
("m20230120_delete_errors", m20230120_delete_errors),
|
||||
("m20230217_server_key_hash", m20230217_server_key_hash),
|
||||
("m20230223_files", m20230223_files)
|
||||
("m20230223_files", m20230223_files),
|
||||
("m20230320_retry_state", m20230320_retry_state)
|
||||
]
|
||||
|
||||
-- | The list of migrations in ascending order by date
|
||||
|
||||
@@ -0,0 +1,13 @@
|
||||
{-# LANGUAGE QuasiQuotes #-}
|
||||
|
||||
module Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230320_retry_state where
|
||||
|
||||
import Database.SQLite.Simple (Query)
|
||||
import Database.SQLite.Simple.QQ (sql)
|
||||
|
||||
m20230320_retry_state :: Query
|
||||
m20230320_retry_state =
|
||||
[sql|
|
||||
ALTER TABLE snd_messages ADD COLUMN retry_int_slow INTEGER;
|
||||
ALTER TABLE snd_messages ADD COLUMN retry_int_fast INTEGER;
|
||||
|]
|
||||
@@ -112,6 +112,8 @@ CREATE TABLE snd_messages(
|
||||
internal_id INTEGER NOT NULL,
|
||||
internal_hash BLOB NOT NULL,
|
||||
previous_msg_hash BLOB NOT NULL DEFAULT x'',
|
||||
retry_int_slow INTEGER,
|
||||
retry_int_fast INTEGER,
|
||||
PRIMARY KEY(conn_id, internal_snd_id),
|
||||
FOREIGN KEY(conn_id, internal_id) REFERENCES messages
|
||||
ON DELETE CASCADE
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
{-# LANGUAGE NamedFieldPuns #-}
|
||||
{-# LANGUAGE ScopedTypeVariables #-}
|
||||
|
||||
module CoreTests.RetryIntervalTests where
|
||||
@@ -38,21 +39,21 @@ testRetryIntervalSameMode =
|
||||
intervals <- newTVarIO []
|
||||
reportedIntervals <- newTVarIO []
|
||||
ts <- newTVarIO =<< getCurrentTime
|
||||
withRetryLock2 testRI lock $ \ri loop -> do
|
||||
withRetryLock2 testRI lock $ \(RI2State slow fast) loop -> do
|
||||
ints <- addInterval intervals ts
|
||||
atomically $ modifyTVar' reportedIntervals (ri :)
|
||||
atomically $ modifyTVar' reportedIntervals ((slow, fast) :)
|
||||
when (length ints < 9) $ loop RIFast
|
||||
(reverse <$> readTVarIO intervals) `shouldReturn` [0, 1, 1, 1, 2, 3, 4, 4, 4]
|
||||
(reverse <$> readTVarIO reportedIntervals)
|
||||
`shouldReturn` [ (RIFast, 0),
|
||||
(RIFast, 10000),
|
||||
(RIFast, 15000),
|
||||
(RIFast, 22500),
|
||||
(RIFast, 33750),
|
||||
(RIFast, 40000),
|
||||
(RIFast, 40000),
|
||||
(RIFast, 40000),
|
||||
(RIFast, 40000)
|
||||
`shouldReturn` [ (20000, 10000),
|
||||
(20000, 10000),
|
||||
(20000, 15000),
|
||||
(20000, 22500),
|
||||
(20000, 33750),
|
||||
(20000, 40000),
|
||||
(20000, 40000),
|
||||
(20000, 40000),
|
||||
(20000, 40000)
|
||||
]
|
||||
|
||||
testRetryIntervalSwitchMode :: Spec
|
||||
@@ -62,23 +63,23 @@ testRetryIntervalSwitchMode =
|
||||
intervals <- newTVarIO []
|
||||
reportedIntervals <- newTVarIO []
|
||||
ts <- newTVarIO =<< getCurrentTime
|
||||
withRetryLock2 testRI lock $ \ri loop -> do
|
||||
withRetryLock2 testRI lock $ \(RI2State slow fast) loop -> do
|
||||
ints <- addInterval intervals ts
|
||||
atomically $ modifyTVar' reportedIntervals (ri :)
|
||||
atomically $ modifyTVar' reportedIntervals ((slow, fast) :)
|
||||
when (length ints < 11) $ loop $ if length ints <= 5 then RIFast else RISlow
|
||||
(reverse <$> readTVarIO intervals) `shouldReturn` [0, 1, 1, 1, 2, 3, 2, 2, 3, 4, 4]
|
||||
(reverse <$> readTVarIO reportedIntervals)
|
||||
`shouldReturn` [ (RIFast, 0),
|
||||
(RIFast, 10000),
|
||||
(RIFast, 15000),
|
||||
(RIFast, 22500),
|
||||
(RIFast, 33750),
|
||||
(RIFast, 40000),
|
||||
(RISlow, 20000),
|
||||
(RISlow, 30000),
|
||||
(RISlow, 40000),
|
||||
(RISlow, 40000),
|
||||
(RISlow, 40000)
|
||||
`shouldReturn` [ (20000, 10000),
|
||||
(20000, 10000),
|
||||
(20000, 15000),
|
||||
(20000, 22500),
|
||||
(20000, 33750),
|
||||
(20000, 40000),
|
||||
(20000, 40000),
|
||||
(30000, 40000),
|
||||
(40000, 40000),
|
||||
(40000, 40000),
|
||||
(40000, 40000)
|
||||
]
|
||||
|
||||
addInterval :: TVar [Int] -> TVar UTCTime -> IO [Int]
|
||||
|
||||
Reference in New Issue
Block a user