From 8f66a0cc982611bb6223d4b5e902cd0ff9d119e3 Mon Sep 17 00:00:00 2001 From: spaced4ndy <8711996+spaced4ndy@users.noreply.github.com> Date: Sat, 2 May 2026 10:31:03 +0000 Subject: [PATCH] core: fix relay request worker retry limit (#6931) * core: fix relay request worker retry limit * update plan * update plan * update plan * wip * schema * wip * wip * schema * update * remove comment * rework * schema * update * schema * update * plans * corrections * add 1 second * remove + 1 * add +1 to schedule * changes * updated schemas --------- Co-authored-by: Evgeny @ SimpleX Chat <259188159+evgeny-simplex@users.noreply.github.com> Co-authored-by: Evgeny Poberezkin --- plans/2026-04-29-relay-request-retry-limit.md | 203 ++++++++++++++++++ simplex-chat.cabal | 2 + src/Simplex/Chat.hs | 6 +- src/Simplex/Chat/Controller.hs | 3 + src/Simplex/Chat/Library/Subscriber.hs | 66 ++++-- src/Simplex/Chat/Store/Groups.hs | 14 +- src/Simplex/Chat/Store/Postgres/Migrations.hs | 4 +- .../M20260429_relay_request_retries.hs | 23 ++ .../Store/Postgres/Migrations/chat_schema.sql | 5 +- src/Simplex/Chat/Store/RelayRequests.hs | 23 +- src/Simplex/Chat/Store/SQLite/Migrations.hs | 4 +- .../M20260429_relay_request_retries.hs | 22 ++ .../SQLite/Migrations/chat_query_plans.txt | 10 +- .../Store/SQLite/Migrations/chat_schema.sql | 5 +- src/Simplex/Chat/Types.hs | 6 +- tests/ChatTests/Groups.hs | 1 - 16 files changed, 359 insertions(+), 38 deletions(-) create mode 100644 plans/2026-04-29-relay-request-retry-limit.md create mode 100644 src/Simplex/Chat/Store/Postgres/Migrations/M20260429_relay_request_retries.hs create mode 100644 src/Simplex/Chat/Store/SQLite/Migrations/M20260429_relay_request_retries.hs diff --git a/plans/2026-04-29-relay-request-retry-limit.md b/plans/2026-04-29-relay-request-retry-limit.md new file mode 100644 index 0000000000..34d0f4c9f0 --- /dev/null +++ b/plans/2026-04-29-relay-request-retry-limit.md @@ -0,0 +1,203 @@ +# Plan: Relay Request Worker Retry Limit + +## Context + +The relay request worker (`runRelayRequestWorker`) processes channel setup requests sequentially using a single worker (`relayRequestWorkerKey = 1`). When a request requires network calls to an unreachable server (e.g., fetching group link data via `getShortLinkConnReq'`), the worker retries indefinitely via `withRetryInterval` + `retryTmpError` — temp/host errors call `loop` with no limit. This blocks all subsequent relay requests from processing. + +This is an attack vector: a channel owner can create a channel link on a server unreachable by the relay, causing the relay request worker to retry forever and blocking all other channel setup requests. + +## Approach + +Follow the XFTP worker retry pattern (`runXFTPDelWorker` in `simplexmq/src/Simplex/FileTransfer/Agent.hs:667`): + +1. **Track retries and delay in DB**: Add `relay_request_retries` and `relay_request_delay` columns to the `groups` table +2. **Order by retries**: Query for next work item ordered by `relay_request_retries ASC, created_at ASC` — items with fewer retries are processed first, stuck items get pushed to the back +3. **Limit consecutive retries**: Replace `withRetryInterval` with `withRetryIntervalCount`, limiting to a small number of consecutive retries per pickup cycle (3, matching XFTP's `xftpConsecutiveRetries`). After the limit, the worker yields and picks the next item. +4. **Store delay for resumption**: On each retry, store the current backoff delay in DB. On next pickup, resume backoff from the stored delay (XFTP pattern: `ri {initialInterval = d, increaseAfter = 0}`) +5. **Expire old requests**: On temp error, before retrying, check if the request is older than 1 day and has 10+ retries — if so, mark as failed instead of retrying. Both conditions must hold — a request that's old but has few retries may just have been delayed, while a request with many retries that's recent is still being actively worked on. + +### How this neutralizes the attack + +- Attacker's request gets picked up, retried 3 times with backoff (~15s total), then yielded +- Worker picks the next item by retry count — legitimate requests (retries=0) go first +- Attacker's request accumulates retries, always processed last +- After 1 day and 10+ retries, the request is marked failed and permanently excluded + +--- + +## Detailed changes + +### 1. Database migration + +New migration: `M20260429_relay_request_retries.hs` + +```sql +ALTER TABLE groups ADD COLUMN relay_request_retries INTEGER NOT NULL DEFAULT 0; +ALTER TABLE groups ADD COLUMN relay_request_delay INTEGER; +``` + +**Files:** +- `src/Simplex/Chat/Store/SQLite/Migrations/M20260429_relay_request_retries.hs` (new) +- `src/Simplex/Chat/Store/SQLite/Migrations.hs` (register) +- `src/Simplex/Chat/Store/Postgres/Migrations/M20260429_relay_request_retries.hs` (new) +- `src/Simplex/Chat/Store/Postgres/Migrations.hs` (register) +- `simplex-chat.cabal` (add modules) + +### 2. Extend RelayRequestData + +**File:** `src/Simplex/Chat/Types.hs` + +```haskell +data RelayRequestData = RelayRequestData + { relayInvId :: InvitationId, + reqGroupLink :: ShortLinkContact, + reqChatVRange :: VersionRangeChat, + relayRequestDelay :: Maybe Int64, + relayRequestRetries :: Int, + relayRequestCreatedAt :: UTCTime + } +``` + +- `relayRequestDelay`: resume backoff from stored position (XFTP pattern) +- `relayRequestRetries`: current retry count, used with `relayRequestCreatedAt` to decide expiry in `retryTmpError` +- `relayRequestCreatedAt`: group creation time, used for the 1-day expiry check + +### 3. Update store functions + +**File:** `src/Simplex/Chat/Store/RelayRequests.hs` + +**`getNextPendingRelayRequest`** — two changes: +- Order by `relay_request_retries ASC, created_at ASC` instead of `group_id ASC` +- SELECT and return `relay_request_delay`, `relay_request_retries`, `created_at` in the data query + +```haskell +getNextPendingRelayRequest db = + getWorkItem "relay request" getNextRequestGroupId getRelayRequestData (markRelayRequestFailed db) + where + getNextRequestGroupId = + maybeFirstRow fromOnly $ + DB.query db + [sql| + SELECT group_id FROM groups + WHERE relay_own_status = ? + AND relay_request_failed = 0 + AND relay_request_err_reason IS NULL + ORDER BY relay_request_retries ASC, created_at ASC + LIMIT 1 + |] + (Only RSInvited) + getRelayRequestData groupId = + firstRow' toRelayRequestData (SEGroupNotFound groupId) $ + DB.query db + [sql| + SELECT relay_request_inv_id, relay_request_group_link, + relay_request_peer_chat_min_version, relay_request_peer_chat_max_version, + relay_request_delay, relay_request_retries, created_at + FROM groups WHERE group_id = ? + |] + (Only groupId) + where + toRelayRequestData (Just relayInvId, Just reqGroupLink, Just minV, Just maxV, relayRequestDelay, relayRequestRetries, relayRequestCreatedAt) = + Right (groupId, RelayRequestData {relayInvId, reqGroupLink, reqChatVRange = fromMaybe (versionToRange maxV) $ safeVersionRange minV maxV, relayRequestDelay, relayRequestRetries, relayRequestCreatedAt}) + toRelayRequestData _ = Left $ SEInternalError "missing relay request data" +``` + +**New function: `updateRelayRequestRetries`**: + +```haskell +updateRelayRequestRetries :: DB.Connection -> GroupId -> Int64 -> IO () +updateRelayRequestRetries db groupId delay = do + currentTs <- getCurrentTime + DB.execute db + "UPDATE groups SET relay_request_retries = relay_request_retries + 1, relay_request_delay = ?, updated_at = ? WHERE group_id = ?" + (delay, currentTs, groupId) +``` + +Export `updateRelayRequestRetries` and `markRelayRequestFailed` from module (the latter is currently internal, used only as a callback in `getWorkItem`). + +### 4. Worker changes + +**File:** `src/Simplex/Chat/Library/Subscriber.hs` + +**Import change**: Add `withRetryIntervalCount` to the import from `Simplex.Messaging.Agent.RetryInterval`. + +**Replace `withRetryInterval` with limited retry** in `runRelayRequestOperation`: + +```haskell +runRelayRequestOperation vr user uclId = + withWork_ a doWork (withStore' getNextPendingRelayRequest) $ + \(groupId, rrd@RelayRequestData {relayRequestDelay}) -> do + ri <- asks $ reconnectInterval . agentConfig . config + let ri' = maybe ri (\d -> ri {initialInterval = d, increaseAfter = 0}) relayRequestDelay + withRetryIntervalLimit ri' $ \delay loop -> do + liftIO $ waitWhileSuspended a + liftIO $ waitForUserNetwork a + processRelayRequest groupId rrd `catchAllErrors` retryTmpError loop groupId rrd delay + where + maxConsecutiveRetries :: Int + maxConsecutiveRetries = 3 + withRetryIntervalLimit :: RetryInterval -> (Int64 -> CM () -> CM ()) -> CM () + withRetryIntervalLimit ri action = + withRetryIntervalCount ri $ \n delay loop -> + when (n < maxConsecutiveRetries) $ action delay loop + retryTmpError :: CM () -> GroupId -> RelayRequestData -> Int64 -> ChatError -> CM () + retryTmpError loop groupId RelayRequestData {relayRequestRetries, relayRequestCreatedAt} delay = \case + ChatErrorAgent {agentError} | temporaryOrHostError agentError -> do + currentTs <- liftIO getCurrentTime + if relayRequestRetries >= 10 && diffUTCTime currentTs relayRequestCreatedAt > nominalDay + then withStore' $ \db -> markRelayRequestFailed db groupId + else do + withStore' $ \db -> updateRelayRequestRetries db groupId delay + loop + e -> do + withStore' $ \db -> setRelayRequestErr db groupId (tshow e) + eToView e +``` + +Key changes from current code: +- `withRetryInterval` → `withRetryIntervalCount` wrapped in local `withRetryIntervalLimit` +- Resume from stored delay via `ri'` (XFTP pattern) +- `retryTmpError` receives the full `RelayRequestData` record and destructures the fields it needs +- On temp error: checks if request is older than 1 day with 10+ retries — if so, marks as failed instead of retrying; otherwise increments retries and calls `loop` +- After `maxConsecutiveRetries` (3), the `when` guard exits, worker picks next item + +--- + +## Files to modify + +| File | Change | +|------|--------| +| `src/Simplex/Chat/Store/SQLite/Migrations/M20260429_relay_request_retries.hs` | New migration | +| `src/Simplex/Chat/Store/SQLite/Migrations.hs` | Register migration | +| `src/Simplex/Chat/Store/Postgres/Migrations/M20260429_relay_request_retries.hs` | New migration | +| `src/Simplex/Chat/Store/Postgres/Migrations.hs` | Register migration | +| `simplex-chat.cabal` | Add migration modules | +| `src/Simplex/Chat/Types.hs` | Add `relayRequestDelay`, `relayRequestRetries`, `relayRequestCreatedAt` to `RelayRequestData` | +| `src/Simplex/Chat/Store/RelayRequests.hs` | Retry ordering, `updateRelayRequestRetries` | +| `src/Simplex/Chat/Library/Subscriber.hs` | Limited retry with delay storage, expiry check in `retryTmpError` | + +## Verification + +1. **Build**: `cabal build --ghc-options=-O0` +2. **Run relay tests**: `cabal test simplex-chat-test --test-options='-m "relay"'` +3. **Scenarios**: + - Request to unreachable server: retried 3 times per cycle, pushed to back of queue, marked failed after 1 day and 10+ retries + - Request to reachable server: succeeds on first attempt, unaffected by changes + - Multiple pending requests: stuck request doesn't block others — items with fewer retries processed first + - App restart with expired pending requests: worker starts, picks up expired request, attempts it — if it succeeds (server now reachable), completes normally; if it fails, `retryTmpError` marks it failed + +## Known considerations + +1. **Single stuck item re-pickup**: If only one request is pending and it's stuck, the worker picks it up repeatedly (3 retries each cycle, immediate re-pickup). This is acceptable — backoff grows via stored delay, and the request is marked failed after 1 day and 10+ retries. The main protection is that other requests aren't blocked. + +2. **`hasPendingRelayRequests` unchanged**: Expired requests still match the `hasPendingRelayRequests` query at startup, so the worker starts. It picks them up, attempts processing — if the server became reachable, the request succeeds normally. If it fails, `retryTmpError` checks the expiry condition and marks it failed. This is strictly better than filtering at query time: expired items get one last chance. + +3. **Delay resumption across pickups**: Stored delay resumes backoff at the last level (XFTP pattern). After many cycles, delay reaches `maxInterval` and stays there. This means retry frequency stabilizes at a low rate for stuck items. + +4. **Permanent errors unchanged**: Non-temp errors (validation, logic) still call `setRelayRequestErr` immediately, permanently excluding the item. The retry mechanism only affects `temporaryOrHostError`. + +5. **`withWork_` re-signals work**: After the action returns (hitting max consecutive retries), `withWork_` has already called `hasWork` (re-signaling the doWork TMVar). The outer `forever` loop immediately proceeds to the next iteration. This is the desired behavior — the worker processes all pending items before waiting. + +6. **`retries` count is from pickup time**: The `relayRequestRetries` value in `retryTmpError` is the count loaded when the item was picked up. Within a single pickup cycle (up to 3 consecutive retries), `updateRelayRequestRetries` increments the DB count but the local value stays the same. The expiry check uses the pickup-time count, which is at most 3 behind the DB. This is acceptable — the threshold (10) has margin. + +7. **Migration column defaults**: `relay_request_retries NOT NULL DEFAULT 0` ensures existing pending requests start with 0 retries. `relay_request_delay` is nullable (NULL = use default reconnectInterval), matching the `Maybe Int64` field. diff --git a/simplex-chat.cabal b/simplex-chat.cabal index 24510ebc70..b9ec8c9f00 100644 --- a/simplex-chat.cabal +++ b/simplex-chat.cabal @@ -130,6 +130,7 @@ library Simplex.Chat.Store.Postgres.Migrations.M20260122_has_link Simplex.Chat.Store.Postgres.Migrations.M20260222_chat_relays Simplex.Chat.Store.Postgres.Migrations.M20260403_item_viewed + Simplex.Chat.Store.Postgres.Migrations.M20260429_relay_request_retries else exposed-modules: Simplex.Chat.Archive @@ -282,6 +283,7 @@ library Simplex.Chat.Store.SQLite.Migrations.M20260122_has_link Simplex.Chat.Store.SQLite.Migrations.M20260222_chat_relays Simplex.Chat.Store.SQLite.Migrations.M20260403_item_viewed + Simplex.Chat.Store.SQLite.Migrations.M20260429_relay_request_retries other-modules: Paths_simplex_chat hs-source-dirs: diff --git a/src/Simplex/Chat.hs b/src/Simplex/Chat.hs index 2671774603..c5f17e5d69 100644 --- a/src/Simplex/Chat.hs +++ b/src/Simplex/Chat.hs @@ -10,6 +10,7 @@ {-# LANGUAGE PatternSynonyms #-} {-# LANGUAGE RankNTypes #-} {-# LANGUAGE ScopedTypeVariables #-} +{-# LANGUAGE NumericUnderscores #-} {-# LANGUAGE TupleSections #-} {-# OPTIONS_GHC -fno-warn-ambiguous-fields #-} @@ -26,7 +27,7 @@ import qualified Data.List.NonEmpty as L import qualified Data.Map.Strict as M import Data.Maybe (fromMaybe, mapMaybe) import Data.Text (Text) -import Data.Time.Clock (getCurrentTime) +import Data.Time.Clock (getCurrentTime, nominalDay) import Simplex.Chat.Controller import Simplex.Chat.Library.Commands import Simplex.Chat.Operators @@ -42,6 +43,7 @@ import Simplex.Chat.Util (shuffle) import Simplex.FileTransfer.Client.Presets (defaultXFTPServers) import Simplex.Messaging.Agent import Simplex.Messaging.Agent.Env.SQLite (AgentConfig (..), InitialAgentServers (..), ServerCfg (..), allRoles, createAgentStore, defaultAgentConfig, presetServerCfg) +import Simplex.Messaging.Agent.RetryInterval (RetryInterval (..)) import Simplex.Messaging.Agent.Protocol import Simplex.Messaging.Agent.Store.Common (DBStore (dbNew)) import qualified Simplex.Messaging.Agent.Store.DB as DB @@ -115,6 +117,8 @@ defaultChatConfig = deliveryWorkerDelay = 0, deliveryBucketSize = 10000, channelSubscriberRole = GRObserver, + relayRequestRetryInterval = RetryInterval {initialInterval = 5_000000, increaseAfter = 0, maxInterval = 600_000000}, + relayRequestExpiry = (10, nominalDay), deviceNameForRemote = "", remoteCompression = True, chatHooks = defaultChatHooks diff --git a/src/Simplex/Chat/Controller.hs b/src/Simplex/Chat/Controller.hs index 0b263a6fa2..cfb60c360a 100644 --- a/src/Simplex/Chat/Controller.hs +++ b/src/Simplex/Chat/Controller.hs @@ -73,6 +73,7 @@ import Simplex.Messaging.Agent (AgentClient, DatabaseDiff, SubscriptionsInfo) import Simplex.Messaging.Agent.Client (AgentLocks, AgentQueuesInfo (..), AgentWorkersDetails (..), AgentWorkersSummary (..), ProtocolTestFailure, SMPServerSubs, ServerQueueInfo, UserNetworkInfo) import Simplex.Messaging.Agent.Env.SQLite (AgentConfig, NetworkConfig, ServerCfg, Worker) import Simplex.Messaging.Agent.Lock +import Simplex.Messaging.Agent.RetryInterval (RetryInterval (..)) import Simplex.Messaging.Agent.Protocol import Simplex.Messaging.Agent.Store.Common (DBStore, withTransaction, withTransactionPriority) import Simplex.Messaging.Agent.Store.Shared (MigrationConfirmation, UpMigration) @@ -158,6 +159,8 @@ data ChatConfig = ChatConfig deliveryWorkerDelay :: Int64, -- microseconds deliveryBucketSize :: Int, channelSubscriberRole :: GroupMemberRole, -- TODO [relays] starting role should be communicated in protocol from owner to relays + relayRequestRetryInterval :: RetryInterval, + relayRequestExpiry :: (Int, NominalDiffTime), highlyAvailable :: Bool, deviceNameForRemote :: Text, remoteCompression :: Bool, diff --git a/src/Simplex/Chat/Library/Subscriber.hs b/src/Simplex/Chat/Library/Subscriber.hs index 019d1e0fd7..48dd63f6cf 100644 --- a/src/Simplex/Chat/Library/Subscriber.hs +++ b/src/Simplex/Chat/Library/Subscriber.hs @@ -37,7 +37,7 @@ import Data.Maybe (catMaybes, fromMaybe, isJust, isNothing, mapMaybe) import Data.Text (Text) import qualified Data.Text as T import Data.Text.Encoding (decodeLatin1) -import Data.Time.Clock (UTCTime, diffUTCTime, getCurrentTime) +import Data.Time.Clock (NominalDiffTime, UTCTime, addUTCTime, diffUTCTime, getCurrentTime) import qualified Data.UUID as UUID import qualified Data.UUID.V4 as V4 import Data.Word (Word32) @@ -77,7 +77,7 @@ import Simplex.Messaging.Agent.Client (getAgentWorker, temporaryOrHostError, wai import Simplex.Messaging.Agent.Env.SQLite (AgentConfig (..), Worker (..)) import Simplex.Messaging.Agent.Protocol import qualified Simplex.Messaging.Agent.Protocol as AP (AgentErrorType (..)) -import Simplex.Messaging.Agent.RetryInterval (withRetryInterval) +import Simplex.Messaging.Agent.RetryInterval (RetryInterval (..), nextRetryDelay) import qualified Simplex.Messaging.Agent.Store.DB as DB import Simplex.Messaging.Client (NetworkRequestMode (..), ProxyClientError (..)) import qualified Simplex.Messaging.Crypto as C @@ -94,8 +94,9 @@ import Simplex.Messaging.Transport (TransportError (..)) import Simplex.Messaging.Util import Simplex.Messaging.Version import qualified System.FilePath as FP +import System.Mem.Weak (Weak) import Text.Read (readMaybe) -import UnliftIO.Concurrent (forkIO) +import UnliftIO.Concurrent (ThreadId, forkIO, mkWeakThreadId) import UnliftIO.Directory import UnliftIO.STM @@ -1492,7 +1493,8 @@ processAgentMessageConn vr user@User {userId} corrId agentConnId agentMessage = toViewTE $ TERejectingGroupJoinRequestMember user gInfo mem rjctReason xGrpRelayInv :: InvitationId -> VersionRangeChat -> GroupRelayInvitation -> CM () xGrpRelayInv invId chatVRange groupRelayInv = do - (_gInfo, _ownerMember) <- withStore $ \db -> createRelayRequestGroup db vr user groupRelayInv invId chatVRange + initialDelay <- asks $ initialInterval . relayRequestRetryInterval . config + (_gInfo, _ownerMember) <- withStore $ \db -> createRelayRequestGroup db vr user groupRelayInv invId chatVRange initialDelay lift $ void $ getRelayRequestWorker True xGrpRelayTest :: InvitationId -> VersionRangeChat -> ByteString -> CM () xGrpRelayTest invId chatVRange challenge = do @@ -3710,23 +3712,55 @@ runRelayRequestWorker a Worker {doWork} = do user <- getRelayUser db UserContactLink {userContactLinkId} <- getUserAddress db user pure (user, userContactLinkId) + delayThreads <- liftIO TM.emptyIO forever $ do lift $ waitForWork doWork - runRelayRequestOperation vr user uclId + runRelayRequestOperation delayThreads vr user uclId where - runRelayRequestOperation :: VersionRangeChat -> User -> Int64 -> CM () - runRelayRequestOperation vr user uclId = - withWork_ a doWork (withStore' getNextPendingRelayRequest) $ + runRelayRequestOperation :: TM.TMap GroupId (TMVar (Weak ThreadId)) -> VersionRangeChat -> User -> Int64 -> CM () + runRelayRequestOperation delayThreads vr user uclId = + withWork_ a doWork getReadyRelayRequest $ \(groupId, rrd) -> do - ri <- asks $ reconnectInterval . agentConfig . config - withRetryInterval ri $ \_ loop -> do - liftIO $ waitWhileSuspended a - liftIO $ waitForUserNetwork a - processRelayRequest groupId rrd `catchAllErrors` retryTmpError loop groupId + ChatConfig {relayRequestExpiry} <- asks config + liftIO $ waitWhileSuspended a + liftIO $ waitForUserNetwork a + processRelayRequest groupId rrd `catchAllErrors` retryTmpError relayRequestExpiry groupId rrd where - retryTmpError :: CM () -> GroupId -> ChatError -> CM () - retryTmpError loop groupId = \case - ChatErrorAgent {agentError} | temporaryOrHostError agentError -> loop + getReadyRelayRequest :: CM (Either StoreError (Maybe (GroupId, RelayRequestData))) + getReadyRelayRequest = + withStore' getNextPendingRelayRequest >>= \case + Right (Just (groupId, rrd@RelayRequestData {reqExecuteAt})) -> do + currentTs <- liftIO getCurrentTime + let delay = diffUTCTime reqExecuteAt currentTs + if delay <= 1 + then pure $ Right (Just (groupId, rrd)) + else Right Nothing <$ scheduleRequest groupId delay + r -> pure r + scheduleRequest :: GroupId -> NominalDiffTime -> CM () + scheduleRequest groupId delay = do + v_ <- liftIO $ atomically $ + ifM + (isNothing <$> TM.lookup groupId delayThreads) + (newEmptyTMVar >>= \v -> TM.insert groupId v delayThreads $> Just v) + (pure Nothing) + forM_ v_ $ \v -> do + tId <- liftIO $ forkIO $ do + threadDelay' $ diffToMicroseconds delay + atomically $ TM.delete groupId delayThreads + void $ atomically $ tryPutTMVar doWork () + weakTId <- liftIO $ mkWeakThreadId tId + liftIO $ atomically $ putTMVar v weakTId + retryTmpError :: (Int, NominalDiffTime) -> GroupId -> RelayRequestData -> ChatError -> CM () + retryTmpError (retriesThreshold, ttl) groupId RelayRequestData {reqDelay, reqRetries, reqCreatedAt} = \case + ChatErrorAgent {agentError} | temporaryOrHostError agentError -> do + currentTs <- liftIO getCurrentTime + if reqRetries >= retriesThreshold && diffUTCTime currentTs reqCreatedAt >= ttl + then withStore' $ \db -> setRelayRequestErr db groupId "expired" + else do + ri <- asks $ relayRequestRetryInterval . config + let executeAt = addUTCTime (fromIntegral reqDelay / 1000000) currentTs + nextDelay = nextRetryDelay 0 reqDelay ri + withStore' $ \db -> updateRelayRequestRetries db groupId nextDelay executeAt e -> do withStore' $ \db -> setRelayRequestErr db groupId (tshow e) eToView e diff --git a/src/Simplex/Chat/Store/Groups.hs b/src/Simplex/Chat/Store/Groups.hs index 93db63ee71..b375f77eb0 100644 --- a/src/Simplex/Chat/Store/Groups.hs +++ b/src/Simplex/Chat/Store/Groups.hs @@ -1515,8 +1515,8 @@ setGroupInProgressDone db GroupInfo {groupId} = do "UPDATE groups SET creating_in_progress = 0, updated_at = ? WHERE group_id = ?" (currentTs, groupId) -createRelayRequestGroup :: DB.Connection -> VersionRangeChat -> User -> GroupRelayInvitation -> InvitationId -> VersionRangeChat -> ExceptT StoreError IO (GroupInfo, GroupMember) -createRelayRequestGroup db vr user@User {userId} GroupRelayInvitation {fromMember, fromMemberProfile, relayMemberId, groupLink} invId reqChatVRange = do +createRelayRequestGroup :: DB.Connection -> VersionRangeChat -> User -> GroupRelayInvitation -> InvitationId -> VersionRangeChat -> Int64 -> ExceptT StoreError IO (GroupInfo, GroupMember) +createRelayRequestGroup db vr user@User {userId} GroupRelayInvitation {fromMember, fromMemberProfile, relayMemberId, groupLink} invId reqChatVRange initialDelay = do currentTs <- liftIO getCurrentTime -- Create group with placeholder profile let Profile {displayName = fromMemberLDN} = fromMemberProfile @@ -1532,7 +1532,7 @@ createRelayRequestGroup db vr user@User {userId} GroupRelayInvitation {fromMembe } (groupId, _groupLDN) <- createGroup_ db userId placeholderProfile Nothing Nothing True (Just RSInvited) Nothing currentTs -- Store relay request data for recovery - liftIO $ setRelayRequestData_ groupId + liftIO $ setRelayRequestData_ groupId currentTs ownerMemberId <- insertOwner_ currentTs groupId let relayMember = MemberIdRole relayMemberId GRRelay -- TODO [member keys] should relays use member keys? @@ -1541,7 +1541,7 @@ createRelayRequestGroup db vr user@User {userId} GroupRelayInvitation {fromMembe g <- getGroupInfo db vr user groupId pure (g, ownerMember) where - setRelayRequestData_ groupId = + setRelayRequestData_ groupId currentTs = DB.execute db [sql| @@ -1549,10 +1549,12 @@ createRelayRequestGroup db vr user@User {userId} GroupRelayInvitation {fromMembe SET relay_request_inv_id = ?, relay_request_group_link = ?, relay_request_peer_chat_min_version = ?, - relay_request_peer_chat_max_version = ? + relay_request_peer_chat_max_version = ?, + relay_request_delay = ?, + relay_request_execute_at = ? WHERE group_id = ? |] - (Binary invId, groupLink, minVersion reqChatVRange, maxVersion reqChatVRange, groupId) + (Binary invId, groupLink, minVersion reqChatVRange, maxVersion reqChatVRange, initialDelay, currentTs, groupId) insertOwner_ currentTs groupId = do let MemberIdRole {memberId, memberRole} = fromMember VersionRange minV maxV = reqChatVRange diff --git a/src/Simplex/Chat/Store/Postgres/Migrations.hs b/src/Simplex/Chat/Store/Postgres/Migrations.hs index 06efcdc17a..cdb461ea70 100644 --- a/src/Simplex/Chat/Store/Postgres/Migrations.hs +++ b/src/Simplex/Chat/Store/Postgres/Migrations.hs @@ -28,6 +28,7 @@ import Simplex.Chat.Store.Postgres.Migrations.M20260108_chat_indices import Simplex.Chat.Store.Postgres.Migrations.M20260122_has_link import Simplex.Chat.Store.Postgres.Migrations.M20260222_chat_relays import Simplex.Chat.Store.Postgres.Migrations.M20260403_item_viewed +import Simplex.Chat.Store.Postgres.Migrations.M20260429_relay_request_retries import Simplex.Messaging.Agent.Store.Shared (Migration (..)) schemaMigrations :: [(String, Text, Maybe Text)] @@ -55,7 +56,8 @@ schemaMigrations = ("20260108_chat_indices", m20260108_chat_indices, Just down_m20260108_chat_indices), ("20260122_has_link", m20260122_has_link, Just down_m20260122_has_link), ("20260222_chat_relays", m20260222_chat_relays, Just down_m20260222_chat_relays), - ("20260403_item_viewed", m20260403_item_viewed, Just down_m20260403_item_viewed) + ("20260403_item_viewed", m20260403_item_viewed, Just down_m20260403_item_viewed), + ("20260429_relay_request_retries", m20260429_relay_request_retries, Just down_m20260429_relay_request_retries) ] -- | The list of migrations in ascending order by date diff --git a/src/Simplex/Chat/Store/Postgres/Migrations/M20260429_relay_request_retries.hs b/src/Simplex/Chat/Store/Postgres/Migrations/M20260429_relay_request_retries.hs new file mode 100644 index 0000000000..f143275f38 --- /dev/null +++ b/src/Simplex/Chat/Store/Postgres/Migrations/M20260429_relay_request_retries.hs @@ -0,0 +1,23 @@ +{-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE QuasiQuotes #-} + +module Simplex.Chat.Store.Postgres.Migrations.M20260429_relay_request_retries where + +import Data.Text (Text) +import Text.RawString.QQ (r) + +m20260429_relay_request_retries :: Text +m20260429_relay_request_retries = + [r| +ALTER TABLE groups ADD COLUMN relay_request_retries BIGINT NOT NULL DEFAULT 0; +ALTER TABLE groups ADD COLUMN relay_request_delay BIGINT NOT NULL DEFAULT 0; +ALTER TABLE groups ADD COLUMN relay_request_execute_at TIMESTAMPTZ NOT NULL DEFAULT (now()); +|] + +down_m20260429_relay_request_retries :: Text +down_m20260429_relay_request_retries = + [r| +ALTER TABLE groups DROP COLUMN relay_request_retries; +ALTER TABLE groups DROP COLUMN relay_request_delay; +ALTER TABLE groups DROP COLUMN relay_request_execute_at; +|] diff --git a/src/Simplex/Chat/Store/Postgres/Migrations/chat_schema.sql b/src/Simplex/Chat/Store/Postgres/Migrations/chat_schema.sql index 41e6f0ed61..f1470ae560 100644 --- a/src/Simplex/Chat/Store/Postgres/Migrations/chat_schema.sql +++ b/src/Simplex/Chat/Store/Postgres/Migrations/chat_schema.sql @@ -959,7 +959,10 @@ CREATE TABLE test_chat_schema.groups ( root_priv_key bytea, root_pub_key bytea, member_priv_key bytea, - public_member_count bigint + public_member_count bigint, + relay_request_retries bigint DEFAULT 0 NOT NULL, + relay_request_delay bigint DEFAULT 0 NOT NULL, + relay_request_execute_at timestamp with time zone DEFAULT now() NOT NULL ); diff --git a/src/Simplex/Chat/Store/RelayRequests.hs b/src/Simplex/Chat/Store/RelayRequests.hs index 3858281878..2e590a1696 100644 --- a/src/Simplex/Chat/Store/RelayRequests.hs +++ b/src/Simplex/Chat/Store/RelayRequests.hs @@ -9,13 +9,15 @@ module Simplex.Chat.Store.RelayRequests ( hasPendingRelayRequests, getNextPendingRelayRequest, + updateRelayRequestRetries, setRelayRequestErr, ) where +import Data.Int (Int64) import Data.Maybe (fromMaybe) import Data.Text (Text) -import Data.Time.Clock (getCurrentTime) +import Data.Time.Clock (UTCTime, getCurrentTime) import Simplex.Chat.Store.Shared import Simplex.Chat.Types import Simplex.Chat.Types.Shared @@ -64,7 +66,7 @@ getNextPendingRelayRequest db = WHERE relay_own_status = ? AND relay_request_failed = 0 AND relay_request_err_reason IS NULL - ORDER BY group_id ASC + ORDER BY relay_request_execute_at ASC LIMIT 1 |] (Only RSInvited) @@ -76,18 +78,27 @@ getNextPendingRelayRequest db = [sql| SELECT relay_request_inv_id, relay_request_group_link, - relay_request_peer_chat_min_version, relay_request_peer_chat_max_version + relay_request_peer_chat_min_version, relay_request_peer_chat_max_version, + relay_request_delay, relay_request_retries, created_at, relay_request_execute_at FROM groups WHERE group_id = ? |] (Only groupId) where - toRelayRequestData :: (Maybe InvitationId, Maybe ShortLinkContact, Maybe VersionChat, Maybe VersionChat) -> Either StoreError (GroupId, RelayRequestData) + toRelayRequestData :: (Maybe InvitationId, Maybe ShortLinkContact, Maybe VersionChat, Maybe VersionChat, Int64, Int, UTCTime, UTCTime) -> Either StoreError (GroupId, RelayRequestData) toRelayRequestData = \case - (Just relayInvId, Just reqGroupLink, Just minV, Just maxV) -> - Right (groupId, RelayRequestData {relayInvId, reqGroupLink, reqChatVRange = fromMaybe (versionToRange maxV) $ safeVersionRange minV maxV}) + (Just relayInvId, Just reqGroupLink, Just minV, Just maxV, reqDelay, reqRetries, reqCreatedAt, reqExecuteAt) -> + Right (groupId, RelayRequestData {relayInvId, reqGroupLink, reqChatVRange = fromMaybe (versionToRange maxV) $ safeVersionRange minV maxV, reqDelay, reqRetries, reqCreatedAt, reqExecuteAt}) _ -> Left $ SEInternalError "missing relay request data" +updateRelayRequestRetries :: DB.Connection -> GroupId -> Int64 -> UTCTime -> IO () +updateRelayRequestRetries db groupId delay executeAt = do + currentTs <- getCurrentTime + DB.execute + db + "UPDATE groups SET relay_request_retries = relay_request_retries + 1, relay_request_delay = ?, relay_request_execute_at = ?, updated_at = ? WHERE group_id = ?" + (delay, executeAt, currentTs, groupId) + markRelayRequestFailed :: DB.Connection -> GroupId -> IO () markRelayRequestFailed db groupId = do currentTs <- getCurrentTime diff --git a/src/Simplex/Chat/Store/SQLite/Migrations.hs b/src/Simplex/Chat/Store/SQLite/Migrations.hs index 607e0549b1..0ab8911ffd 100644 --- a/src/Simplex/Chat/Store/SQLite/Migrations.hs +++ b/src/Simplex/Chat/Store/SQLite/Migrations.hs @@ -151,6 +151,7 @@ import Simplex.Chat.Store.SQLite.Migrations.M20260108_chat_indices import Simplex.Chat.Store.SQLite.Migrations.M20260122_has_link import Simplex.Chat.Store.SQLite.Migrations.M20260222_chat_relays import Simplex.Chat.Store.SQLite.Migrations.M20260403_item_viewed +import Simplex.Chat.Store.SQLite.Migrations.M20260429_relay_request_retries import Simplex.Messaging.Agent.Store.Shared (Migration (..)) schemaMigrations :: [(String, Query, Maybe Query)] @@ -301,7 +302,8 @@ schemaMigrations = ("20260108_chat_indices", m20260108_chat_indices, Just down_m20260108_chat_indices), ("20260122_has_link", m20260122_has_link, Just down_m20260122_has_link), ("20260222_chat_relays", m20260222_chat_relays, Just down_m20260222_chat_relays), - ("20260403_item_viewed", m20260403_item_viewed, Just down_m20260403_item_viewed) + ("20260403_item_viewed", m20260403_item_viewed, Just down_m20260403_item_viewed), + ("20260429_relay_request_retries", m20260429_relay_request_retries, Just down_m20260429_relay_request_retries) ] -- | The list of migrations in ascending order by date diff --git a/src/Simplex/Chat/Store/SQLite/Migrations/M20260429_relay_request_retries.hs b/src/Simplex/Chat/Store/SQLite/Migrations/M20260429_relay_request_retries.hs new file mode 100644 index 0000000000..13216a5d4c --- /dev/null +++ b/src/Simplex/Chat/Store/SQLite/Migrations/M20260429_relay_request_retries.hs @@ -0,0 +1,22 @@ +{-# LANGUAGE QuasiQuotes #-} + +module Simplex.Chat.Store.SQLite.Migrations.M20260429_relay_request_retries where + +import Database.SQLite.Simple (Query) +import Database.SQLite.Simple.QQ (sql) + +m20260429_relay_request_retries :: Query +m20260429_relay_request_retries = + [sql| +ALTER TABLE groups ADD COLUMN relay_request_retries INTEGER NOT NULL DEFAULT 0; +ALTER TABLE groups ADD COLUMN relay_request_delay INTEGER NOT NULL DEFAULT 0; +ALTER TABLE groups ADD COLUMN relay_request_execute_at TEXT NOT NULL DEFAULT(datetime('now')); +|] + +down_m20260429_relay_request_retries :: Query +down_m20260429_relay_request_retries = + [sql| +ALTER TABLE groups DROP COLUMN relay_request_retries; +ALTER TABLE groups DROP COLUMN relay_request_delay; +ALTER TABLE groups DROP COLUMN relay_request_execute_at; +|] diff --git a/src/Simplex/Chat/Store/SQLite/Migrations/chat_query_plans.txt b/src/Simplex/Chat/Store/SQLite/Migrations/chat_query_plans.txt index 99880b23d7..7a226bf78a 100644 --- a/src/Simplex/Chat/Store/SQLite/Migrations/chat_query_plans.txt +++ b/src/Simplex/Chat/Store/SQLite/Migrations/chat_query_plans.txt @@ -695,7 +695,8 @@ SEARCH delivery_jobs USING INTEGER PRIMARY KEY (rowid=?) Query: SELECT relay_request_inv_id, relay_request_group_link, - relay_request_peer_chat_min_version, relay_request_peer_chat_max_version + relay_request_peer_chat_min_version, relay_request_peer_chat_max_version, + relay_request_delay, relay_request_retries, created_at, relay_request_execute_at FROM groups WHERE group_id = ? @@ -993,11 +994,12 @@ Query: WHERE relay_own_status = ? AND relay_request_failed = 0 AND relay_request_err_reason IS NULL - ORDER BY group_id ASC + ORDER BY relay_request_execute_at ASC LIMIT 1 Plan: SCAN groups +USE TEMP B-TREE FOR ORDER BY Query: SELECT i.chat_item_id @@ -1775,7 +1777,9 @@ Query: SET relay_request_inv_id = ?, relay_request_group_link = ?, relay_request_peer_chat_min_version = ?, - relay_request_peer_chat_max_version = ? + relay_request_peer_chat_max_version = ?, + relay_request_delay = ?, + relay_request_execute_at = ? WHERE group_id = ? Plan: diff --git a/src/Simplex/Chat/Store/SQLite/Migrations/chat_schema.sql b/src/Simplex/Chat/Store/SQLite/Migrations/chat_schema.sql index 36f373f193..95a92eb7f0 100644 --- a/src/Simplex/Chat/Store/SQLite/Migrations/chat_schema.sql +++ b/src/Simplex/Chat/Store/SQLite/Migrations/chat_schema.sql @@ -173,7 +173,10 @@ CREATE TABLE groups( root_priv_key BLOB, root_pub_key BLOB, member_priv_key BLOB, - public_member_count INTEGER, -- received + public_member_count INTEGER, + relay_request_retries INTEGER NOT NULL DEFAULT 0, + relay_request_delay INTEGER NOT NULL DEFAULT 0, + relay_request_execute_at TEXT NOT NULL DEFAULT(datetime('now')), -- received FOREIGN KEY(user_id, local_display_name) REFERENCES display_names(user_id, local_display_name) ON DELETE CASCADE diff --git a/src/Simplex/Chat/Types.hs b/src/Simplex/Chat/Types.hs index b4acaedd39..b068e6b679 100644 --- a/src/Simplex/Chat/Types.hs +++ b/src/Simplex/Chat/Types.hs @@ -1045,7 +1045,11 @@ data GroupMember = GroupMember data RelayRequestData = RelayRequestData { relayInvId :: InvitationId, reqGroupLink :: ShortLinkContact, - reqChatVRange :: VersionRangeChat + reqChatVRange :: VersionRangeChat, + reqDelay :: Int64, + reqRetries :: Int, + reqCreatedAt :: UTCTime, + reqExecuteAt :: UTCTime } deriving (Eq, Show) diff --git a/tests/ChatTests/Groups.hs b/tests/ChatTests/Groups.hs index e4442d8e0b..21bf6cb65a 100644 --- a/tests/ChatTests/Groups.hs +++ b/tests/ChatTests/Groups.hs @@ -235,7 +235,6 @@ chatGroupTests = do it "should respect support preference in channel" testSupportPreferenceChannel -- TODO [relays] add tests for channels -- TODO - tests with delivery loop over members restored after restart - -- TODO - delivery in support scopes inside channels -- TODO - connect plans for relay groups -- TODO - cancellation on failure to create relay group (for owner) -- TODO - async retry connecting to relay (for members)