From d41c2bec2af2aa77e7d671800c08c9760187dff9 Mon Sep 17 00:00:00 2001 From: Evgeny Poberezkin <2769109+epoberezkin@users.noreply.github.com> Date: Wed, 22 Mar 2023 08:42:56 +0000 Subject: [PATCH] store retry intervals to resume from the same interval on restart (#693) * store retry intervals to resume from the same interval on restart * add migration * updates --- simplexmq.cabal | 1 + src/Simplex/Messaging/Agent.hs | 14 ++++-- src/Simplex/Messaging/Agent/Env/SQLite.hs | 2 +- src/Simplex/Messaging/Agent/RetryInterval.hs | 32 ++++++++---- src/Simplex/Messaging/Agent/Store.hs | 2 + src/Simplex/Messaging/Agent/Store/SQLite.hs | 15 ++++-- .../Agent/Store/SQLite/Migrations.hs | 4 +- .../Migrations/M20230320_retry_state.hs | 13 +++++ .../Store/SQLite/Migrations/agent_schema.sql | 2 + tests/CoreTests/RetryIntervalTests.hs | 49 ++++++++++--------- 10 files changed, 90 insertions(+), 44 deletions(-) create mode 100644 src/Simplex/Messaging/Agent/Store/SQLite/Migrations/M20230320_retry_state.hs diff --git a/simplexmq.cabal b/simplexmq.cabal index 9140410be..3061f6fbd 100644 --- a/simplexmq.cabal +++ b/simplexmq.cabal @@ -78,6 +78,7 @@ library Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230120_delete_errors Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230217_server_key_hash Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230223_files + Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230320_retry_state Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230401_snd_files Simplex.Messaging.Agent.TAsyncs Simplex.Messaging.Agent.TRcvQueues diff --git a/src/Simplex/Messaging/Agent.hs b/src/Simplex/Messaging/Agent.hs index 8d037c185..5b6d475b5 100644 --- a/src/Simplex/Messaging/Agent.hs +++ b/src/Simplex/Messaging/Agent.hs @@ -1024,8 +1024,9 @@ runSmpQueueMsgDelivery c@AgentClient {subQ} cData@ConnData {userId, connId, dupl E.try (withStore c $ \db -> getPendingMsgData db connId msgId) >>= \case Left (e :: E.SomeException) -> notify $ MERR mId (INTERNAL $ show e) - Right (rq_, PendingMsgData {msgType, msgBody, msgFlags, internalTs}) -> - withRetryLock2 ri qLock $ \_ loop -> do + Right (rq_, PendingMsgData {msgType, msgBody, msgFlags, msgRetryState, internalTs}) -> do + let ri' = maybe id updateRetryInterval2 msgRetryState ri + withRetryLock2 ri' qLock $ \riState loop -> do resp <- tryError $ case msgType of AM_CONN_INFO -> sendConfirmation c sq msgBody _ -> sendAgentMessage c sq msgFlags msgBody @@ -1036,7 +1037,7 @@ runSmpQueueMsgDelivery c@AgentClient {subQ} cData@ConnData {userId, connId, dupl SMP SMP.QUOTA -> case msgType of AM_CONN_INFO -> connError msgId NOT_AVAILABLE AM_CONN_INFO_REPLY -> connError msgId NOT_AVAILABLE - _ -> retrySndOp c $ loop RISlow + _ -> retrySndMsg RISlow SMP SMP.AUTH -> case msgType of AM_CONN_INFO -> connError msgId NOT_AVAILABLE AM_CONN_INFO_REPLY -> connError msgId NOT_AVAILABLE @@ -1045,7 +1046,7 @@ runSmpQueueMsgDelivery c@AgentClient {subQ} cData@ConnData {userId, connId, dupl -- because the queue must be secured by the time the confirmation or the first HELLO is received | duplexHandshake == Just True -> connErr | otherwise -> - ifM (msgExpired helloTimeout) connErr (retrySndOp c $ loop RIFast) + ifM (msgExpired helloTimeout) connErr (retrySndMsg RIFast) where connErr = case rq_ of -- party initiating connection @@ -1064,13 +1065,16 @@ runSmpQueueMsgDelivery c@AgentClient {subQ} cData@ConnData {userId, connId, dupl -- the message sending would be retried | temporaryOrHostError e -> do let timeoutSel = if msgType == AM_HELLO_ then helloTimeout else messageTimeout - ifM (msgExpired timeoutSel) (notifyDel msgId err) (retrySndOp c $ loop RIFast) + ifM (msgExpired timeoutSel) (notifyDel msgId err) (retrySndMsg RIFast) | otherwise -> notifyDel msgId err where msgExpired timeoutSel = do msgTimeout <- asks $ timeoutSel . config currentTime <- liftIO getCurrentTime pure $ diffUTCTime currentTime internalTs > msgTimeout + retrySndMsg riMode = do + withStore' c $ \db -> updatePendingMsgRIState db connId msgId riState + retrySndOp c $ loop riMode Right () -> do case msgType of AM_CONN_INFO -> do diff --git a/src/Simplex/Messaging/Agent/Env/SQLite.hs b/src/Simplex/Messaging/Agent/Env/SQLite.hs index 5deb6ed67..27b460cf6 100644 --- a/src/Simplex/Messaging/Agent/Env/SQLite.hs +++ b/src/Simplex/Messaging/Agent/Env/SQLite.hs @@ -137,7 +137,7 @@ defaultMessageRetryInterval = RetryInterval { initialInterval = 60_000000, increaseAfter = 60_000000, - maxInterval = 1200_000000 -- 20min + maxInterval = 3600_000000 -- 1 hour } } diff --git a/src/Simplex/Messaging/Agent/RetryInterval.hs b/src/Simplex/Messaging/Agent/RetryInterval.hs index 28128dab5..1ad820a53 100644 --- a/src/Simplex/Messaging/Agent/RetryInterval.hs +++ b/src/Simplex/Messaging/Agent/RetryInterval.hs @@ -1,13 +1,16 @@ {-# LANGUAGE LambdaCase #-} {-# LANGUAGE NamedFieldPuns #-} +{-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE ScopedTypeVariables #-} module Simplex.Messaging.Agent.RetryInterval ( RetryInterval (..), RetryInterval2 (..), RetryIntervalMode (..), + RI2State (..), withRetryInterval, withRetryLock2, + updateRetryInterval2, ) where @@ -28,6 +31,19 @@ data RetryInterval2 = RetryInterval2 riFast :: RetryInterval } +data RI2State = RI2State + { slowInterval :: Int, + fastInterval :: Int + } + deriving (Show) + +updateRetryInterval2 :: RI2State -> RetryInterval2 -> RetryInterval2 +updateRetryInterval2 RI2State {slowInterval, fastInterval} RetryInterval2 {riSlow, riFast} = + RetryInterval2 + { riSlow = riSlow {initialInterval = slowInterval, increaseAfter = 0}, + riFast = riFast {initialInterval = fastInterval, increaseAfter = 0} + } + data RetryIntervalMode = RISlow | RIFast deriving (Eq, Show) @@ -43,18 +59,16 @@ withRetryInterval ri action = callAction 0 $ initialInterval ri callAction elapsed' $ nextDelay elapsed' delay ri -- This function allows action to toggle between slow and fast retry intervals. -withRetryLock2 :: forall m. MonadIO m => RetryInterval2 -> TMVar () -> ((RetryIntervalMode, Int) -> (RetryIntervalMode -> m ()) -> m ()) -> m () +withRetryLock2 :: forall m. MonadIO m => RetryInterval2 -> TMVar () -> (RI2State -> (RetryIntervalMode -> m ()) -> m ()) -> m () withRetryLock2 RetryInterval2 {riSlow, riFast} lock action = - callAction (RIFast, 0) (0, initialInterval riSlow) (0, initialInterval riFast) + callAction (0, initialInterval riSlow) (0, initialInterval riFast) where - callAction :: (RetryIntervalMode, Int) -> (Int, Int) -> (Int, Int) -> m () - callAction retryState slow fast = action retryState loop + callAction :: (Int, Int) -> (Int, Int) -> m () + callAction slow fast = action (RI2State (snd slow) (snd fast)) loop where - loop mode = case mode of - RISlow -> run slow riSlow (\ri -> callAction (state ri) ri fast) - RIFast -> run fast riFast (\ri -> callAction (state ri) slow ri) - where - state ri = (mode, snd ri) + loop = \case + RISlow -> run slow riSlow (`callAction` fast) + RIFast -> run fast riFast (callAction slow) run (elapsed, delay) ri call = do wait delay let elapsed' = elapsed + delay diff --git a/src/Simplex/Messaging/Agent/Store.hs b/src/Simplex/Messaging/Agent/Store.hs index cf139c96b..248e52fc9 100644 --- a/src/Simplex/Messaging/Agent/Store.hs +++ b/src/Simplex/Messaging/Agent/Store.hs @@ -24,6 +24,7 @@ import qualified Data.List.NonEmpty as L import Data.Time (UTCTime) import Data.Type.Equality import Simplex.Messaging.Agent.Protocol +import Simplex.Messaging.Agent.RetryInterval (RI2State) import qualified Simplex.Messaging.Crypto as C import Simplex.Messaging.Crypto.Ratchet (RatchetX448) import Simplex.Messaging.Encoding.String @@ -453,6 +454,7 @@ data PendingMsgData = PendingMsgData msgType :: AgentMessageType, msgFlags :: MsgFlags, msgBody :: MsgBody, + msgRetryState :: Maybe RI2State, internalTs :: InternalTs } deriving (Show) diff --git a/src/Simplex/Messaging/Agent/Store/SQLite.hs b/src/Simplex/Messaging/Agent/Store/SQLite.hs index 2ad2015dd..3f141ec33 100644 --- a/src/Simplex/Messaging/Agent/Store/SQLite.hs +++ b/src/Simplex/Messaging/Agent/Store/SQLite.hs @@ -84,6 +84,7 @@ module Simplex.Messaging.Agent.Store.SQLite createSndMsg, createSndMsgDelivery, getPendingMsgData, + updatePendingMsgRIState, getPendingMsgs, deletePendingMsgs, setMsgUserAck, @@ -184,6 +185,7 @@ import Simplex.FileTransfer.Description import Simplex.FileTransfer.Protocol (FileParty (..)) import Simplex.FileTransfer.Types import Simplex.Messaging.Agent.Protocol +import Simplex.Messaging.Agent.RetryInterval (RI2State (..)) import Simplex.Messaging.Agent.Store import Simplex.Messaging.Agent.Store.SQLite.Migrations (Migration) import qualified Simplex.Messaging.Agent.Store.SQLite.Migrations as Migrations @@ -762,16 +764,21 @@ getPendingMsgData db connId msgId = do DB.query db [sql| - SELECT m.msg_type, m.msg_flags, m.msg_body, m.internal_ts + SELECT m.msg_type, m.msg_flags, m.msg_body, m.internal_ts, s.retry_int_slow, s.retry_int_fast FROM messages m JOIN snd_messages s ON s.conn_id = m.conn_id AND s.internal_id = m.internal_id WHERE m.conn_id = ? AND m.internal_id = ? |] (connId, msgId) - pendingMsgData :: (AgentMessageType, Maybe MsgFlags, MsgBody, InternalTs) -> PendingMsgData - pendingMsgData (msgType, msgFlags_, msgBody, internalTs) = + pendingMsgData :: (AgentMessageType, Maybe MsgFlags, MsgBody, InternalTs, Maybe Int, Maybe Int) -> PendingMsgData + pendingMsgData (msgType, msgFlags_, msgBody, internalTs, riSlow_, riFast_) = let msgFlags = fromMaybe SMP.noMsgFlags msgFlags_ - in PendingMsgData {msgId, msgType, msgFlags, msgBody, internalTs} + msgRetryState = RI2State <$> riSlow_ <*> riFast_ + in PendingMsgData {msgId, msgType, msgFlags, msgBody, msgRetryState, internalTs} + +updatePendingMsgRIState :: DB.Connection -> ConnId -> InternalId -> RI2State -> IO () +updatePendingMsgRIState db connId msgId RI2State {slowInterval, fastInterval} = + DB.execute db "UPDATE snd_messages SET retry_int_slow = ?, retry_int_fast = ? WHERE conn_id = ? AND internal_id = ?" (slowInterval, fastInterval, connId, msgId) getPendingMsgs :: DB.Connection -> ConnId -> SndQueue -> IO [InternalId] getPendingMsgs db connId SndQueue {dbQueueId} = diff --git a/src/Simplex/Messaging/Agent/Store/SQLite/Migrations.hs b/src/Simplex/Messaging/Agent/Store/SQLite/Migrations.hs index 7cdc153a4..83855bc09 100644 --- a/src/Simplex/Messaging/Agent/Store/SQLite/Migrations.hs +++ b/src/Simplex/Messaging/Agent/Store/SQLite/Migrations.hs @@ -42,6 +42,7 @@ import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230117_fkey_indexes import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230120_delete_errors import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230217_server_key_hash import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230223_files +import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230320_retry_state import Simplex.Messaging.Encoding.String import Simplex.Messaging.Transport.Client (TransportHost) @@ -63,7 +64,8 @@ schemaMigrations = ("m20230117_fkey_indexes", m20230117_fkey_indexes), ("m20230120_delete_errors", m20230120_delete_errors), ("m20230217_server_key_hash", m20230217_server_key_hash), - ("m20230223_files", m20230223_files) + ("m20230223_files", m20230223_files), + ("m20230320_retry_state", m20230320_retry_state) ] -- | The list of migrations in ascending order by date diff --git a/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/M20230320_retry_state.hs b/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/M20230320_retry_state.hs new file mode 100644 index 000000000..f48941fca --- /dev/null +++ b/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/M20230320_retry_state.hs @@ -0,0 +1,13 @@ +{-# LANGUAGE QuasiQuotes #-} + +module Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230320_retry_state where + +import Database.SQLite.Simple (Query) +import Database.SQLite.Simple.QQ (sql) + +m20230320_retry_state :: Query +m20230320_retry_state = + [sql| +ALTER TABLE snd_messages ADD COLUMN retry_int_slow INTEGER; +ALTER TABLE snd_messages ADD COLUMN retry_int_fast INTEGER; +|] diff --git a/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/agent_schema.sql b/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/agent_schema.sql index 29de284f9..696b438cd 100644 --- a/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/agent_schema.sql +++ b/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/agent_schema.sql @@ -112,6 +112,8 @@ CREATE TABLE snd_messages( internal_id INTEGER NOT NULL, internal_hash BLOB NOT NULL, previous_msg_hash BLOB NOT NULL DEFAULT x'', + retry_int_slow INTEGER, + retry_int_fast INTEGER, PRIMARY KEY(conn_id, internal_snd_id), FOREIGN KEY(conn_id, internal_id) REFERENCES messages ON DELETE CASCADE diff --git a/tests/CoreTests/RetryIntervalTests.hs b/tests/CoreTests/RetryIntervalTests.hs index 6bede691a..d49bd3d14 100644 --- a/tests/CoreTests/RetryIntervalTests.hs +++ b/tests/CoreTests/RetryIntervalTests.hs @@ -1,3 +1,4 @@ +{-# LANGUAGE NamedFieldPuns #-} {-# LANGUAGE ScopedTypeVariables #-} module CoreTests.RetryIntervalTests where @@ -38,21 +39,21 @@ testRetryIntervalSameMode = intervals <- newTVarIO [] reportedIntervals <- newTVarIO [] ts <- newTVarIO =<< getCurrentTime - withRetryLock2 testRI lock $ \ri loop -> do + withRetryLock2 testRI lock $ \(RI2State slow fast) loop -> do ints <- addInterval intervals ts - atomically $ modifyTVar' reportedIntervals (ri :) + atomically $ modifyTVar' reportedIntervals ((slow, fast) :) when (length ints < 9) $ loop RIFast (reverse <$> readTVarIO intervals) `shouldReturn` [0, 1, 1, 1, 2, 3, 4, 4, 4] (reverse <$> readTVarIO reportedIntervals) - `shouldReturn` [ (RIFast, 0), - (RIFast, 10000), - (RIFast, 15000), - (RIFast, 22500), - (RIFast, 33750), - (RIFast, 40000), - (RIFast, 40000), - (RIFast, 40000), - (RIFast, 40000) + `shouldReturn` [ (20000, 10000), + (20000, 10000), + (20000, 15000), + (20000, 22500), + (20000, 33750), + (20000, 40000), + (20000, 40000), + (20000, 40000), + (20000, 40000) ] testRetryIntervalSwitchMode :: Spec @@ -62,23 +63,23 @@ testRetryIntervalSwitchMode = intervals <- newTVarIO [] reportedIntervals <- newTVarIO [] ts <- newTVarIO =<< getCurrentTime - withRetryLock2 testRI lock $ \ri loop -> do + withRetryLock2 testRI lock $ \(RI2State slow fast) loop -> do ints <- addInterval intervals ts - atomically $ modifyTVar' reportedIntervals (ri :) + atomically $ modifyTVar' reportedIntervals ((slow, fast) :) when (length ints < 11) $ loop $ if length ints <= 5 then RIFast else RISlow (reverse <$> readTVarIO intervals) `shouldReturn` [0, 1, 1, 1, 2, 3, 2, 2, 3, 4, 4] (reverse <$> readTVarIO reportedIntervals) - `shouldReturn` [ (RIFast, 0), - (RIFast, 10000), - (RIFast, 15000), - (RIFast, 22500), - (RIFast, 33750), - (RIFast, 40000), - (RISlow, 20000), - (RISlow, 30000), - (RISlow, 40000), - (RISlow, 40000), - (RISlow, 40000) + `shouldReturn` [ (20000, 10000), + (20000, 10000), + (20000, 15000), + (20000, 22500), + (20000, 33750), + (20000, 40000), + (20000, 40000), + (30000, 40000), + (40000, 40000), + (40000, 40000), + (40000, 40000) ] addInterval :: TVar [Int] -> TVar UTCTime -> IO [Int]