mirror of
https://github.com/simplex-chat/simplex-chat.git
synced 2026-05-25 20:44:38 +00:00
core: fix relay request worker retry limit (#6931)
* core: fix relay request worker retry limit * update plan * update plan * update plan * wip * schema * wip * wip * schema * update * remove comment * rework * schema * update * schema * update * plans * corrections * add 1 second * remove + 1 * add +1 to schedule * changes * updated schemas --------- Co-authored-by: Evgeny @ SimpleX Chat <259188159+evgeny-simplex@users.noreply.github.com> Co-authored-by: Evgeny Poberezkin <evgeny@poberezkin.com>
This commit is contained in:
@@ -0,0 +1,203 @@
|
||||
# Plan: Relay Request Worker Retry Limit
|
||||
|
||||
## Context
|
||||
|
||||
The relay request worker (`runRelayRequestWorker`) processes channel setup requests sequentially using a single worker (`relayRequestWorkerKey = 1`). When a request requires network calls to an unreachable server (e.g., fetching group link data via `getShortLinkConnReq'`), the worker retries indefinitely via `withRetryInterval` + `retryTmpError` — temp/host errors call `loop` with no limit. This blocks all subsequent relay requests from processing.
|
||||
|
||||
This is an attack vector: a channel owner can create a channel link on a server unreachable by the relay, causing the relay request worker to retry forever and blocking all other channel setup requests.
|
||||
|
||||
## Approach
|
||||
|
||||
Follow the XFTP worker retry pattern (`runXFTPDelWorker` in `simplexmq/src/Simplex/FileTransfer/Agent.hs:667`):
|
||||
|
||||
1. **Track retries and delay in DB**: Add `relay_request_retries` and `relay_request_delay` columns to the `groups` table
|
||||
2. **Order by retries**: Query for next work item ordered by `relay_request_retries ASC, created_at ASC` — items with fewer retries are processed first, stuck items get pushed to the back
|
||||
3. **Limit consecutive retries**: Replace `withRetryInterval` with `withRetryIntervalCount`, limiting to a small number of consecutive retries per pickup cycle (3, matching XFTP's `xftpConsecutiveRetries`). After the limit, the worker yields and picks the next item.
|
||||
4. **Store delay for resumption**: On each retry, store the current backoff delay in DB. On next pickup, resume backoff from the stored delay (XFTP pattern: `ri {initialInterval = d, increaseAfter = 0}`)
|
||||
5. **Expire old requests**: On temp error, before retrying, check if the request is older than 1 day and has 10+ retries — if so, mark as failed instead of retrying. Both conditions must hold — a request that's old but has few retries may just have been delayed, while a request with many retries that's recent is still being actively worked on.
|
||||
|
||||
### How this neutralizes the attack
|
||||
|
||||
- Attacker's request gets picked up, retried 3 times with backoff (~15s total), then yielded
|
||||
- Worker picks the next item by retry count — legitimate requests (retries=0) go first
|
||||
- Attacker's request accumulates retries, always processed last
|
||||
- After 1 day and 10+ retries, the request is marked failed and permanently excluded
|
||||
|
||||
---
|
||||
|
||||
## Detailed changes
|
||||
|
||||
### 1. Database migration
|
||||
|
||||
New migration: `M20260429_relay_request_retries.hs`
|
||||
|
||||
```sql
|
||||
ALTER TABLE groups ADD COLUMN relay_request_retries INTEGER NOT NULL DEFAULT 0;
|
||||
ALTER TABLE groups ADD COLUMN relay_request_delay INTEGER;
|
||||
```
|
||||
|
||||
**Files:**
|
||||
- `src/Simplex/Chat/Store/SQLite/Migrations/M20260429_relay_request_retries.hs` (new)
|
||||
- `src/Simplex/Chat/Store/SQLite/Migrations.hs` (register)
|
||||
- `src/Simplex/Chat/Store/Postgres/Migrations/M20260429_relay_request_retries.hs` (new)
|
||||
- `src/Simplex/Chat/Store/Postgres/Migrations.hs` (register)
|
||||
- `simplex-chat.cabal` (add modules)
|
||||
|
||||
### 2. Extend RelayRequestData
|
||||
|
||||
**File:** `src/Simplex/Chat/Types.hs`
|
||||
|
||||
```haskell
|
||||
data RelayRequestData = RelayRequestData
|
||||
{ relayInvId :: InvitationId,
|
||||
reqGroupLink :: ShortLinkContact,
|
||||
reqChatVRange :: VersionRangeChat,
|
||||
relayRequestDelay :: Maybe Int64,
|
||||
relayRequestRetries :: Int,
|
||||
relayRequestCreatedAt :: UTCTime
|
||||
}
|
||||
```
|
||||
|
||||
- `relayRequestDelay`: resume backoff from stored position (XFTP pattern)
|
||||
- `relayRequestRetries`: current retry count, used with `relayRequestCreatedAt` to decide expiry in `retryTmpError`
|
||||
- `relayRequestCreatedAt`: group creation time, used for the 1-day expiry check
|
||||
|
||||
### 3. Update store functions
|
||||
|
||||
**File:** `src/Simplex/Chat/Store/RelayRequests.hs`
|
||||
|
||||
**`getNextPendingRelayRequest`** — two changes:
|
||||
- Order by `relay_request_retries ASC, created_at ASC` instead of `group_id ASC`
|
||||
- SELECT and return `relay_request_delay`, `relay_request_retries`, `created_at` in the data query
|
||||
|
||||
```haskell
|
||||
getNextPendingRelayRequest db =
|
||||
getWorkItem "relay request" getNextRequestGroupId getRelayRequestData (markRelayRequestFailed db)
|
||||
where
|
||||
getNextRequestGroupId =
|
||||
maybeFirstRow fromOnly $
|
||||
DB.query db
|
||||
[sql|
|
||||
SELECT group_id FROM groups
|
||||
WHERE relay_own_status = ?
|
||||
AND relay_request_failed = 0
|
||||
AND relay_request_err_reason IS NULL
|
||||
ORDER BY relay_request_retries ASC, created_at ASC
|
||||
LIMIT 1
|
||||
|]
|
||||
(Only RSInvited)
|
||||
getRelayRequestData groupId =
|
||||
firstRow' toRelayRequestData (SEGroupNotFound groupId) $
|
||||
DB.query db
|
||||
[sql|
|
||||
SELECT relay_request_inv_id, relay_request_group_link,
|
||||
relay_request_peer_chat_min_version, relay_request_peer_chat_max_version,
|
||||
relay_request_delay, relay_request_retries, created_at
|
||||
FROM groups WHERE group_id = ?
|
||||
|]
|
||||
(Only groupId)
|
||||
where
|
||||
toRelayRequestData (Just relayInvId, Just reqGroupLink, Just minV, Just maxV, relayRequestDelay, relayRequestRetries, relayRequestCreatedAt) =
|
||||
Right (groupId, RelayRequestData {relayInvId, reqGroupLink, reqChatVRange = fromMaybe (versionToRange maxV) $ safeVersionRange minV maxV, relayRequestDelay, relayRequestRetries, relayRequestCreatedAt})
|
||||
toRelayRequestData _ = Left $ SEInternalError "missing relay request data"
|
||||
```
|
||||
|
||||
**New function: `updateRelayRequestRetries`**:
|
||||
|
||||
```haskell
|
||||
updateRelayRequestRetries :: DB.Connection -> GroupId -> Int64 -> IO ()
|
||||
updateRelayRequestRetries db groupId delay = do
|
||||
currentTs <- getCurrentTime
|
||||
DB.execute db
|
||||
"UPDATE groups SET relay_request_retries = relay_request_retries + 1, relay_request_delay = ?, updated_at = ? WHERE group_id = ?"
|
||||
(delay, currentTs, groupId)
|
||||
```
|
||||
|
||||
Export `updateRelayRequestRetries` and `markRelayRequestFailed` from module (the latter is currently internal, used only as a callback in `getWorkItem`).
|
||||
|
||||
### 4. Worker changes
|
||||
|
||||
**File:** `src/Simplex/Chat/Library/Subscriber.hs`
|
||||
|
||||
**Import change**: Add `withRetryIntervalCount` to the import from `Simplex.Messaging.Agent.RetryInterval`.
|
||||
|
||||
**Replace `withRetryInterval` with limited retry** in `runRelayRequestOperation`:
|
||||
|
||||
```haskell
|
||||
runRelayRequestOperation vr user uclId =
|
||||
withWork_ a doWork (withStore' getNextPendingRelayRequest) $
|
||||
\(groupId, rrd@RelayRequestData {relayRequestDelay}) -> do
|
||||
ri <- asks $ reconnectInterval . agentConfig . config
|
||||
let ri' = maybe ri (\d -> ri {initialInterval = d, increaseAfter = 0}) relayRequestDelay
|
||||
withRetryIntervalLimit ri' $ \delay loop -> do
|
||||
liftIO $ waitWhileSuspended a
|
||||
liftIO $ waitForUserNetwork a
|
||||
processRelayRequest groupId rrd `catchAllErrors` retryTmpError loop groupId rrd delay
|
||||
where
|
||||
maxConsecutiveRetries :: Int
|
||||
maxConsecutiveRetries = 3
|
||||
withRetryIntervalLimit :: RetryInterval -> (Int64 -> CM () -> CM ()) -> CM ()
|
||||
withRetryIntervalLimit ri action =
|
||||
withRetryIntervalCount ri $ \n delay loop ->
|
||||
when (n < maxConsecutiveRetries) $ action delay loop
|
||||
retryTmpError :: CM () -> GroupId -> RelayRequestData -> Int64 -> ChatError -> CM ()
|
||||
retryTmpError loop groupId RelayRequestData {relayRequestRetries, relayRequestCreatedAt} delay = \case
|
||||
ChatErrorAgent {agentError} | temporaryOrHostError agentError -> do
|
||||
currentTs <- liftIO getCurrentTime
|
||||
if relayRequestRetries >= 10 && diffUTCTime currentTs relayRequestCreatedAt > nominalDay
|
||||
then withStore' $ \db -> markRelayRequestFailed db groupId
|
||||
else do
|
||||
withStore' $ \db -> updateRelayRequestRetries db groupId delay
|
||||
loop
|
||||
e -> do
|
||||
withStore' $ \db -> setRelayRequestErr db groupId (tshow e)
|
||||
eToView e
|
||||
```
|
||||
|
||||
Key changes from current code:
|
||||
- `withRetryInterval` → `withRetryIntervalCount` wrapped in local `withRetryIntervalLimit`
|
||||
- Resume from stored delay via `ri'` (XFTP pattern)
|
||||
- `retryTmpError` receives the full `RelayRequestData` record and destructures the fields it needs
|
||||
- On temp error: checks if request is older than 1 day with 10+ retries — if so, marks as failed instead of retrying; otherwise increments retries and calls `loop`
|
||||
- After `maxConsecutiveRetries` (3), the `when` guard exits, worker picks next item
|
||||
|
||||
---
|
||||
|
||||
## Files to modify
|
||||
|
||||
| File | Change |
|
||||
|------|--------|
|
||||
| `src/Simplex/Chat/Store/SQLite/Migrations/M20260429_relay_request_retries.hs` | New migration |
|
||||
| `src/Simplex/Chat/Store/SQLite/Migrations.hs` | Register migration |
|
||||
| `src/Simplex/Chat/Store/Postgres/Migrations/M20260429_relay_request_retries.hs` | New migration |
|
||||
| `src/Simplex/Chat/Store/Postgres/Migrations.hs` | Register migration |
|
||||
| `simplex-chat.cabal` | Add migration modules |
|
||||
| `src/Simplex/Chat/Types.hs` | Add `relayRequestDelay`, `relayRequestRetries`, `relayRequestCreatedAt` to `RelayRequestData` |
|
||||
| `src/Simplex/Chat/Store/RelayRequests.hs` | Retry ordering, `updateRelayRequestRetries` |
|
||||
| `src/Simplex/Chat/Library/Subscriber.hs` | Limited retry with delay storage, expiry check in `retryTmpError` |
|
||||
|
||||
## Verification
|
||||
|
||||
1. **Build**: `cabal build --ghc-options=-O0`
|
||||
2. **Run relay tests**: `cabal test simplex-chat-test --test-options='-m "relay"'`
|
||||
3. **Scenarios**:
|
||||
- Request to unreachable server: retried 3 times per cycle, pushed to back of queue, marked failed after 1 day and 10+ retries
|
||||
- Request to reachable server: succeeds on first attempt, unaffected by changes
|
||||
- Multiple pending requests: stuck request doesn't block others — items with fewer retries processed first
|
||||
- App restart with expired pending requests: worker starts, picks up expired request, attempts it — if it succeeds (server now reachable), completes normally; if it fails, `retryTmpError` marks it failed
|
||||
|
||||
## Known considerations
|
||||
|
||||
1. **Single stuck item re-pickup**: If only one request is pending and it's stuck, the worker picks it up repeatedly (3 retries each cycle, immediate re-pickup). This is acceptable — backoff grows via stored delay, and the request is marked failed after 1 day and 10+ retries. The main protection is that other requests aren't blocked.
|
||||
|
||||
2. **`hasPendingRelayRequests` unchanged**: Expired requests still match the `hasPendingRelayRequests` query at startup, so the worker starts. It picks them up, attempts processing — if the server became reachable, the request succeeds normally. If it fails, `retryTmpError` checks the expiry condition and marks it failed. This is strictly better than filtering at query time: expired items get one last chance.
|
||||
|
||||
3. **Delay resumption across pickups**: Stored delay resumes backoff at the last level (XFTP pattern). After many cycles, delay reaches `maxInterval` and stays there. This means retry frequency stabilizes at a low rate for stuck items.
|
||||
|
||||
4. **Permanent errors unchanged**: Non-temp errors (validation, logic) still call `setRelayRequestErr` immediately, permanently excluding the item. The retry mechanism only affects `temporaryOrHostError`.
|
||||
|
||||
5. **`withWork_` re-signals work**: After the action returns (hitting max consecutive retries), `withWork_` has already called `hasWork` (re-signaling the doWork TMVar). The outer `forever` loop immediately proceeds to the next iteration. This is the desired behavior — the worker processes all pending items before waiting.
|
||||
|
||||
6. **`retries` count is from pickup time**: The `relayRequestRetries` value in `retryTmpError` is the count loaded when the item was picked up. Within a single pickup cycle (up to 3 consecutive retries), `updateRelayRequestRetries` increments the DB count but the local value stays the same. The expiry check uses the pickup-time count, which is at most 3 behind the DB. This is acceptable — the threshold (10) has margin.
|
||||
|
||||
7. **Migration column defaults**: `relay_request_retries NOT NULL DEFAULT 0` ensures existing pending requests start with 0 retries. `relay_request_delay` is nullable (NULL = use default reconnectInterval), matching the `Maybe Int64` field.
|
||||
@@ -130,6 +130,7 @@ library
|
||||
Simplex.Chat.Store.Postgres.Migrations.M20260122_has_link
|
||||
Simplex.Chat.Store.Postgres.Migrations.M20260222_chat_relays
|
||||
Simplex.Chat.Store.Postgres.Migrations.M20260403_item_viewed
|
||||
Simplex.Chat.Store.Postgres.Migrations.M20260429_relay_request_retries
|
||||
else
|
||||
exposed-modules:
|
||||
Simplex.Chat.Archive
|
||||
@@ -282,6 +283,7 @@ library
|
||||
Simplex.Chat.Store.SQLite.Migrations.M20260122_has_link
|
||||
Simplex.Chat.Store.SQLite.Migrations.M20260222_chat_relays
|
||||
Simplex.Chat.Store.SQLite.Migrations.M20260403_item_viewed
|
||||
Simplex.Chat.Store.SQLite.Migrations.M20260429_relay_request_retries
|
||||
other-modules:
|
||||
Paths_simplex_chat
|
||||
hs-source-dirs:
|
||||
|
||||
+5
-1
@@ -10,6 +10,7 @@
|
||||
{-# LANGUAGE PatternSynonyms #-}
|
||||
{-# LANGUAGE RankNTypes #-}
|
||||
{-# LANGUAGE ScopedTypeVariables #-}
|
||||
{-# LANGUAGE NumericUnderscores #-}
|
||||
{-# LANGUAGE TupleSections #-}
|
||||
{-# OPTIONS_GHC -fno-warn-ambiguous-fields #-}
|
||||
|
||||
@@ -26,7 +27,7 @@ import qualified Data.List.NonEmpty as L
|
||||
import qualified Data.Map.Strict as M
|
||||
import Data.Maybe (fromMaybe, mapMaybe)
|
||||
import Data.Text (Text)
|
||||
import Data.Time.Clock (getCurrentTime)
|
||||
import Data.Time.Clock (getCurrentTime, nominalDay)
|
||||
import Simplex.Chat.Controller
|
||||
import Simplex.Chat.Library.Commands
|
||||
import Simplex.Chat.Operators
|
||||
@@ -42,6 +43,7 @@ import Simplex.Chat.Util (shuffle)
|
||||
import Simplex.FileTransfer.Client.Presets (defaultXFTPServers)
|
||||
import Simplex.Messaging.Agent
|
||||
import Simplex.Messaging.Agent.Env.SQLite (AgentConfig (..), InitialAgentServers (..), ServerCfg (..), allRoles, createAgentStore, defaultAgentConfig, presetServerCfg)
|
||||
import Simplex.Messaging.Agent.RetryInterval (RetryInterval (..))
|
||||
import Simplex.Messaging.Agent.Protocol
|
||||
import Simplex.Messaging.Agent.Store.Common (DBStore (dbNew))
|
||||
import qualified Simplex.Messaging.Agent.Store.DB as DB
|
||||
@@ -115,6 +117,8 @@ defaultChatConfig =
|
||||
deliveryWorkerDelay = 0,
|
||||
deliveryBucketSize = 10000,
|
||||
channelSubscriberRole = GRObserver,
|
||||
relayRequestRetryInterval = RetryInterval {initialInterval = 5_000000, increaseAfter = 0, maxInterval = 600_000000},
|
||||
relayRequestExpiry = (10, nominalDay),
|
||||
deviceNameForRemote = "",
|
||||
remoteCompression = True,
|
||||
chatHooks = defaultChatHooks
|
||||
|
||||
@@ -73,6 +73,7 @@ import Simplex.Messaging.Agent (AgentClient, DatabaseDiff, SubscriptionsInfo)
|
||||
import Simplex.Messaging.Agent.Client (AgentLocks, AgentQueuesInfo (..), AgentWorkersDetails (..), AgentWorkersSummary (..), ProtocolTestFailure, SMPServerSubs, ServerQueueInfo, UserNetworkInfo)
|
||||
import Simplex.Messaging.Agent.Env.SQLite (AgentConfig, NetworkConfig, ServerCfg, Worker)
|
||||
import Simplex.Messaging.Agent.Lock
|
||||
import Simplex.Messaging.Agent.RetryInterval (RetryInterval (..))
|
||||
import Simplex.Messaging.Agent.Protocol
|
||||
import Simplex.Messaging.Agent.Store.Common (DBStore, withTransaction, withTransactionPriority)
|
||||
import Simplex.Messaging.Agent.Store.Shared (MigrationConfirmation, UpMigration)
|
||||
@@ -158,6 +159,8 @@ data ChatConfig = ChatConfig
|
||||
deliveryWorkerDelay :: Int64, -- microseconds
|
||||
deliveryBucketSize :: Int,
|
||||
channelSubscriberRole :: GroupMemberRole, -- TODO [relays] starting role should be communicated in protocol from owner to relays
|
||||
relayRequestRetryInterval :: RetryInterval,
|
||||
relayRequestExpiry :: (Int, NominalDiffTime),
|
||||
highlyAvailable :: Bool,
|
||||
deviceNameForRemote :: Text,
|
||||
remoteCompression :: Bool,
|
||||
|
||||
@@ -37,7 +37,7 @@ import Data.Maybe (catMaybes, fromMaybe, isJust, isNothing, mapMaybe)
|
||||
import Data.Text (Text)
|
||||
import qualified Data.Text as T
|
||||
import Data.Text.Encoding (decodeLatin1)
|
||||
import Data.Time.Clock (UTCTime, diffUTCTime, getCurrentTime)
|
||||
import Data.Time.Clock (NominalDiffTime, UTCTime, addUTCTime, diffUTCTime, getCurrentTime)
|
||||
import qualified Data.UUID as UUID
|
||||
import qualified Data.UUID.V4 as V4
|
||||
import Data.Word (Word32)
|
||||
@@ -77,7 +77,7 @@ import Simplex.Messaging.Agent.Client (getAgentWorker, temporaryOrHostError, wai
|
||||
import Simplex.Messaging.Agent.Env.SQLite (AgentConfig (..), Worker (..))
|
||||
import Simplex.Messaging.Agent.Protocol
|
||||
import qualified Simplex.Messaging.Agent.Protocol as AP (AgentErrorType (..))
|
||||
import Simplex.Messaging.Agent.RetryInterval (withRetryInterval)
|
||||
import Simplex.Messaging.Agent.RetryInterval (RetryInterval (..), nextRetryDelay)
|
||||
import qualified Simplex.Messaging.Agent.Store.DB as DB
|
||||
import Simplex.Messaging.Client (NetworkRequestMode (..), ProxyClientError (..))
|
||||
import qualified Simplex.Messaging.Crypto as C
|
||||
@@ -94,8 +94,9 @@ import Simplex.Messaging.Transport (TransportError (..))
|
||||
import Simplex.Messaging.Util
|
||||
import Simplex.Messaging.Version
|
||||
import qualified System.FilePath as FP
|
||||
import System.Mem.Weak (Weak)
|
||||
import Text.Read (readMaybe)
|
||||
import UnliftIO.Concurrent (forkIO)
|
||||
import UnliftIO.Concurrent (ThreadId, forkIO, mkWeakThreadId)
|
||||
import UnliftIO.Directory
|
||||
import UnliftIO.STM
|
||||
|
||||
@@ -1492,7 +1493,8 @@ processAgentMessageConn vr user@User {userId} corrId agentConnId agentMessage =
|
||||
toViewTE $ TERejectingGroupJoinRequestMember user gInfo mem rjctReason
|
||||
xGrpRelayInv :: InvitationId -> VersionRangeChat -> GroupRelayInvitation -> CM ()
|
||||
xGrpRelayInv invId chatVRange groupRelayInv = do
|
||||
(_gInfo, _ownerMember) <- withStore $ \db -> createRelayRequestGroup db vr user groupRelayInv invId chatVRange
|
||||
initialDelay <- asks $ initialInterval . relayRequestRetryInterval . config
|
||||
(_gInfo, _ownerMember) <- withStore $ \db -> createRelayRequestGroup db vr user groupRelayInv invId chatVRange initialDelay
|
||||
lift $ void $ getRelayRequestWorker True
|
||||
xGrpRelayTest :: InvitationId -> VersionRangeChat -> ByteString -> CM ()
|
||||
xGrpRelayTest invId chatVRange challenge = do
|
||||
@@ -3710,23 +3712,55 @@ runRelayRequestWorker a Worker {doWork} = do
|
||||
user <- getRelayUser db
|
||||
UserContactLink {userContactLinkId} <- getUserAddress db user
|
||||
pure (user, userContactLinkId)
|
||||
delayThreads <- liftIO TM.emptyIO
|
||||
forever $ do
|
||||
lift $ waitForWork doWork
|
||||
runRelayRequestOperation vr user uclId
|
||||
runRelayRequestOperation delayThreads vr user uclId
|
||||
where
|
||||
runRelayRequestOperation :: VersionRangeChat -> User -> Int64 -> CM ()
|
||||
runRelayRequestOperation vr user uclId =
|
||||
withWork_ a doWork (withStore' getNextPendingRelayRequest) $
|
||||
runRelayRequestOperation :: TM.TMap GroupId (TMVar (Weak ThreadId)) -> VersionRangeChat -> User -> Int64 -> CM ()
|
||||
runRelayRequestOperation delayThreads vr user uclId =
|
||||
withWork_ a doWork getReadyRelayRequest $
|
||||
\(groupId, rrd) -> do
|
||||
ri <- asks $ reconnectInterval . agentConfig . config
|
||||
withRetryInterval ri $ \_ loop -> do
|
||||
liftIO $ waitWhileSuspended a
|
||||
liftIO $ waitForUserNetwork a
|
||||
processRelayRequest groupId rrd `catchAllErrors` retryTmpError loop groupId
|
||||
ChatConfig {relayRequestExpiry} <- asks config
|
||||
liftIO $ waitWhileSuspended a
|
||||
liftIO $ waitForUserNetwork a
|
||||
processRelayRequest groupId rrd `catchAllErrors` retryTmpError relayRequestExpiry groupId rrd
|
||||
where
|
||||
retryTmpError :: CM () -> GroupId -> ChatError -> CM ()
|
||||
retryTmpError loop groupId = \case
|
||||
ChatErrorAgent {agentError} | temporaryOrHostError agentError -> loop
|
||||
getReadyRelayRequest :: CM (Either StoreError (Maybe (GroupId, RelayRequestData)))
|
||||
getReadyRelayRequest =
|
||||
withStore' getNextPendingRelayRequest >>= \case
|
||||
Right (Just (groupId, rrd@RelayRequestData {reqExecuteAt})) -> do
|
||||
currentTs <- liftIO getCurrentTime
|
||||
let delay = diffUTCTime reqExecuteAt currentTs
|
||||
if delay <= 1
|
||||
then pure $ Right (Just (groupId, rrd))
|
||||
else Right Nothing <$ scheduleRequest groupId delay
|
||||
r -> pure r
|
||||
scheduleRequest :: GroupId -> NominalDiffTime -> CM ()
|
||||
scheduleRequest groupId delay = do
|
||||
v_ <- liftIO $ atomically $
|
||||
ifM
|
||||
(isNothing <$> TM.lookup groupId delayThreads)
|
||||
(newEmptyTMVar >>= \v -> TM.insert groupId v delayThreads $> Just v)
|
||||
(pure Nothing)
|
||||
forM_ v_ $ \v -> do
|
||||
tId <- liftIO $ forkIO $ do
|
||||
threadDelay' $ diffToMicroseconds delay
|
||||
atomically $ TM.delete groupId delayThreads
|
||||
void $ atomically $ tryPutTMVar doWork ()
|
||||
weakTId <- liftIO $ mkWeakThreadId tId
|
||||
liftIO $ atomically $ putTMVar v weakTId
|
||||
retryTmpError :: (Int, NominalDiffTime) -> GroupId -> RelayRequestData -> ChatError -> CM ()
|
||||
retryTmpError (retriesThreshold, ttl) groupId RelayRequestData {reqDelay, reqRetries, reqCreatedAt} = \case
|
||||
ChatErrorAgent {agentError} | temporaryOrHostError agentError -> do
|
||||
currentTs <- liftIO getCurrentTime
|
||||
if reqRetries >= retriesThreshold && diffUTCTime currentTs reqCreatedAt >= ttl
|
||||
then withStore' $ \db -> setRelayRequestErr db groupId "expired"
|
||||
else do
|
||||
ri <- asks $ relayRequestRetryInterval . config
|
||||
let executeAt = addUTCTime (fromIntegral reqDelay / 1000000) currentTs
|
||||
nextDelay = nextRetryDelay 0 reqDelay ri
|
||||
withStore' $ \db -> updateRelayRequestRetries db groupId nextDelay executeAt
|
||||
e -> do
|
||||
withStore' $ \db -> setRelayRequestErr db groupId (tshow e)
|
||||
eToView e
|
||||
|
||||
@@ -1515,8 +1515,8 @@ setGroupInProgressDone db GroupInfo {groupId} = do
|
||||
"UPDATE groups SET creating_in_progress = 0, updated_at = ? WHERE group_id = ?"
|
||||
(currentTs, groupId)
|
||||
|
||||
createRelayRequestGroup :: DB.Connection -> VersionRangeChat -> User -> GroupRelayInvitation -> InvitationId -> VersionRangeChat -> ExceptT StoreError IO (GroupInfo, GroupMember)
|
||||
createRelayRequestGroup db vr user@User {userId} GroupRelayInvitation {fromMember, fromMemberProfile, relayMemberId, groupLink} invId reqChatVRange = do
|
||||
createRelayRequestGroup :: DB.Connection -> VersionRangeChat -> User -> GroupRelayInvitation -> InvitationId -> VersionRangeChat -> Int64 -> ExceptT StoreError IO (GroupInfo, GroupMember)
|
||||
createRelayRequestGroup db vr user@User {userId} GroupRelayInvitation {fromMember, fromMemberProfile, relayMemberId, groupLink} invId reqChatVRange initialDelay = do
|
||||
currentTs <- liftIO getCurrentTime
|
||||
-- Create group with placeholder profile
|
||||
let Profile {displayName = fromMemberLDN} = fromMemberProfile
|
||||
@@ -1532,7 +1532,7 @@ createRelayRequestGroup db vr user@User {userId} GroupRelayInvitation {fromMembe
|
||||
}
|
||||
(groupId, _groupLDN) <- createGroup_ db userId placeholderProfile Nothing Nothing True (Just RSInvited) Nothing currentTs
|
||||
-- Store relay request data for recovery
|
||||
liftIO $ setRelayRequestData_ groupId
|
||||
liftIO $ setRelayRequestData_ groupId currentTs
|
||||
ownerMemberId <- insertOwner_ currentTs groupId
|
||||
let relayMember = MemberIdRole relayMemberId GRRelay
|
||||
-- TODO [member keys] should relays use member keys?
|
||||
@@ -1541,7 +1541,7 @@ createRelayRequestGroup db vr user@User {userId} GroupRelayInvitation {fromMembe
|
||||
g <- getGroupInfo db vr user groupId
|
||||
pure (g, ownerMember)
|
||||
where
|
||||
setRelayRequestData_ groupId =
|
||||
setRelayRequestData_ groupId currentTs =
|
||||
DB.execute
|
||||
db
|
||||
[sql|
|
||||
@@ -1549,10 +1549,12 @@ createRelayRequestGroup db vr user@User {userId} GroupRelayInvitation {fromMembe
|
||||
SET relay_request_inv_id = ?,
|
||||
relay_request_group_link = ?,
|
||||
relay_request_peer_chat_min_version = ?,
|
||||
relay_request_peer_chat_max_version = ?
|
||||
relay_request_peer_chat_max_version = ?,
|
||||
relay_request_delay = ?,
|
||||
relay_request_execute_at = ?
|
||||
WHERE group_id = ?
|
||||
|]
|
||||
(Binary invId, groupLink, minVersion reqChatVRange, maxVersion reqChatVRange, groupId)
|
||||
(Binary invId, groupLink, minVersion reqChatVRange, maxVersion reqChatVRange, initialDelay, currentTs, groupId)
|
||||
insertOwner_ currentTs groupId = do
|
||||
let MemberIdRole {memberId, memberRole} = fromMember
|
||||
VersionRange minV maxV = reqChatVRange
|
||||
|
||||
@@ -28,6 +28,7 @@ import Simplex.Chat.Store.Postgres.Migrations.M20260108_chat_indices
|
||||
import Simplex.Chat.Store.Postgres.Migrations.M20260122_has_link
|
||||
import Simplex.Chat.Store.Postgres.Migrations.M20260222_chat_relays
|
||||
import Simplex.Chat.Store.Postgres.Migrations.M20260403_item_viewed
|
||||
import Simplex.Chat.Store.Postgres.Migrations.M20260429_relay_request_retries
|
||||
import Simplex.Messaging.Agent.Store.Shared (Migration (..))
|
||||
|
||||
schemaMigrations :: [(String, Text, Maybe Text)]
|
||||
@@ -55,7 +56,8 @@ schemaMigrations =
|
||||
("20260108_chat_indices", m20260108_chat_indices, Just down_m20260108_chat_indices),
|
||||
("20260122_has_link", m20260122_has_link, Just down_m20260122_has_link),
|
||||
("20260222_chat_relays", m20260222_chat_relays, Just down_m20260222_chat_relays),
|
||||
("20260403_item_viewed", m20260403_item_viewed, Just down_m20260403_item_viewed)
|
||||
("20260403_item_viewed", m20260403_item_viewed, Just down_m20260403_item_viewed),
|
||||
("20260429_relay_request_retries", m20260429_relay_request_retries, Just down_m20260429_relay_request_retries)
|
||||
]
|
||||
|
||||
-- | The list of migrations in ascending order by date
|
||||
|
||||
@@ -0,0 +1,23 @@
|
||||
{-# LANGUAGE OverloadedStrings #-}
|
||||
{-# LANGUAGE QuasiQuotes #-}
|
||||
|
||||
module Simplex.Chat.Store.Postgres.Migrations.M20260429_relay_request_retries where
|
||||
|
||||
import Data.Text (Text)
|
||||
import Text.RawString.QQ (r)
|
||||
|
||||
m20260429_relay_request_retries :: Text
|
||||
m20260429_relay_request_retries =
|
||||
[r|
|
||||
ALTER TABLE groups ADD COLUMN relay_request_retries BIGINT NOT NULL DEFAULT 0;
|
||||
ALTER TABLE groups ADD COLUMN relay_request_delay BIGINT NOT NULL DEFAULT 0;
|
||||
ALTER TABLE groups ADD COLUMN relay_request_execute_at TIMESTAMPTZ NOT NULL DEFAULT (now());
|
||||
|]
|
||||
|
||||
down_m20260429_relay_request_retries :: Text
|
||||
down_m20260429_relay_request_retries =
|
||||
[r|
|
||||
ALTER TABLE groups DROP COLUMN relay_request_retries;
|
||||
ALTER TABLE groups DROP COLUMN relay_request_delay;
|
||||
ALTER TABLE groups DROP COLUMN relay_request_execute_at;
|
||||
|]
|
||||
@@ -959,7 +959,10 @@ CREATE TABLE test_chat_schema.groups (
|
||||
root_priv_key bytea,
|
||||
root_pub_key bytea,
|
||||
member_priv_key bytea,
|
||||
public_member_count bigint
|
||||
public_member_count bigint,
|
||||
relay_request_retries bigint DEFAULT 0 NOT NULL,
|
||||
relay_request_delay bigint DEFAULT 0 NOT NULL,
|
||||
relay_request_execute_at timestamp with time zone DEFAULT now() NOT NULL
|
||||
);
|
||||
|
||||
|
||||
|
||||
@@ -9,13 +9,15 @@
|
||||
module Simplex.Chat.Store.RelayRequests
|
||||
( hasPendingRelayRequests,
|
||||
getNextPendingRelayRequest,
|
||||
updateRelayRequestRetries,
|
||||
setRelayRequestErr,
|
||||
)
|
||||
where
|
||||
|
||||
import Data.Int (Int64)
|
||||
import Data.Maybe (fromMaybe)
|
||||
import Data.Text (Text)
|
||||
import Data.Time.Clock (getCurrentTime)
|
||||
import Data.Time.Clock (UTCTime, getCurrentTime)
|
||||
import Simplex.Chat.Store.Shared
|
||||
import Simplex.Chat.Types
|
||||
import Simplex.Chat.Types.Shared
|
||||
@@ -64,7 +66,7 @@ getNextPendingRelayRequest db =
|
||||
WHERE relay_own_status = ?
|
||||
AND relay_request_failed = 0
|
||||
AND relay_request_err_reason IS NULL
|
||||
ORDER BY group_id ASC
|
||||
ORDER BY relay_request_execute_at ASC
|
||||
LIMIT 1
|
||||
|]
|
||||
(Only RSInvited)
|
||||
@@ -76,18 +78,27 @@ getNextPendingRelayRequest db =
|
||||
[sql|
|
||||
SELECT
|
||||
relay_request_inv_id, relay_request_group_link,
|
||||
relay_request_peer_chat_min_version, relay_request_peer_chat_max_version
|
||||
relay_request_peer_chat_min_version, relay_request_peer_chat_max_version,
|
||||
relay_request_delay, relay_request_retries, created_at, relay_request_execute_at
|
||||
FROM groups
|
||||
WHERE group_id = ?
|
||||
|]
|
||||
(Only groupId)
|
||||
where
|
||||
toRelayRequestData :: (Maybe InvitationId, Maybe ShortLinkContact, Maybe VersionChat, Maybe VersionChat) -> Either StoreError (GroupId, RelayRequestData)
|
||||
toRelayRequestData :: (Maybe InvitationId, Maybe ShortLinkContact, Maybe VersionChat, Maybe VersionChat, Int64, Int, UTCTime, UTCTime) -> Either StoreError (GroupId, RelayRequestData)
|
||||
toRelayRequestData = \case
|
||||
(Just relayInvId, Just reqGroupLink, Just minV, Just maxV) ->
|
||||
Right (groupId, RelayRequestData {relayInvId, reqGroupLink, reqChatVRange = fromMaybe (versionToRange maxV) $ safeVersionRange minV maxV})
|
||||
(Just relayInvId, Just reqGroupLink, Just minV, Just maxV, reqDelay, reqRetries, reqCreatedAt, reqExecuteAt) ->
|
||||
Right (groupId, RelayRequestData {relayInvId, reqGroupLink, reqChatVRange = fromMaybe (versionToRange maxV) $ safeVersionRange minV maxV, reqDelay, reqRetries, reqCreatedAt, reqExecuteAt})
|
||||
_ -> Left $ SEInternalError "missing relay request data"
|
||||
|
||||
updateRelayRequestRetries :: DB.Connection -> GroupId -> Int64 -> UTCTime -> IO ()
|
||||
updateRelayRequestRetries db groupId delay executeAt = do
|
||||
currentTs <- getCurrentTime
|
||||
DB.execute
|
||||
db
|
||||
"UPDATE groups SET relay_request_retries = relay_request_retries + 1, relay_request_delay = ?, relay_request_execute_at = ?, updated_at = ? WHERE group_id = ?"
|
||||
(delay, executeAt, currentTs, groupId)
|
||||
|
||||
markRelayRequestFailed :: DB.Connection -> GroupId -> IO ()
|
||||
markRelayRequestFailed db groupId = do
|
||||
currentTs <- getCurrentTime
|
||||
|
||||
@@ -151,6 +151,7 @@ import Simplex.Chat.Store.SQLite.Migrations.M20260108_chat_indices
|
||||
import Simplex.Chat.Store.SQLite.Migrations.M20260122_has_link
|
||||
import Simplex.Chat.Store.SQLite.Migrations.M20260222_chat_relays
|
||||
import Simplex.Chat.Store.SQLite.Migrations.M20260403_item_viewed
|
||||
import Simplex.Chat.Store.SQLite.Migrations.M20260429_relay_request_retries
|
||||
import Simplex.Messaging.Agent.Store.Shared (Migration (..))
|
||||
|
||||
schemaMigrations :: [(String, Query, Maybe Query)]
|
||||
@@ -301,7 +302,8 @@ schemaMigrations =
|
||||
("20260108_chat_indices", m20260108_chat_indices, Just down_m20260108_chat_indices),
|
||||
("20260122_has_link", m20260122_has_link, Just down_m20260122_has_link),
|
||||
("20260222_chat_relays", m20260222_chat_relays, Just down_m20260222_chat_relays),
|
||||
("20260403_item_viewed", m20260403_item_viewed, Just down_m20260403_item_viewed)
|
||||
("20260403_item_viewed", m20260403_item_viewed, Just down_m20260403_item_viewed),
|
||||
("20260429_relay_request_retries", m20260429_relay_request_retries, Just down_m20260429_relay_request_retries)
|
||||
]
|
||||
|
||||
-- | The list of migrations in ascending order by date
|
||||
|
||||
@@ -0,0 +1,22 @@
|
||||
{-# LANGUAGE QuasiQuotes #-}
|
||||
|
||||
module Simplex.Chat.Store.SQLite.Migrations.M20260429_relay_request_retries where
|
||||
|
||||
import Database.SQLite.Simple (Query)
|
||||
import Database.SQLite.Simple.QQ (sql)
|
||||
|
||||
m20260429_relay_request_retries :: Query
|
||||
m20260429_relay_request_retries =
|
||||
[sql|
|
||||
ALTER TABLE groups ADD COLUMN relay_request_retries INTEGER NOT NULL DEFAULT 0;
|
||||
ALTER TABLE groups ADD COLUMN relay_request_delay INTEGER NOT NULL DEFAULT 0;
|
||||
ALTER TABLE groups ADD COLUMN relay_request_execute_at TEXT NOT NULL DEFAULT(datetime('now'));
|
||||
|]
|
||||
|
||||
down_m20260429_relay_request_retries :: Query
|
||||
down_m20260429_relay_request_retries =
|
||||
[sql|
|
||||
ALTER TABLE groups DROP COLUMN relay_request_retries;
|
||||
ALTER TABLE groups DROP COLUMN relay_request_delay;
|
||||
ALTER TABLE groups DROP COLUMN relay_request_execute_at;
|
||||
|]
|
||||
@@ -695,7 +695,8 @@ SEARCH delivery_jobs USING INTEGER PRIMARY KEY (rowid=?)
|
||||
Query:
|
||||
SELECT
|
||||
relay_request_inv_id, relay_request_group_link,
|
||||
relay_request_peer_chat_min_version, relay_request_peer_chat_max_version
|
||||
relay_request_peer_chat_min_version, relay_request_peer_chat_max_version,
|
||||
relay_request_delay, relay_request_retries, created_at, relay_request_execute_at
|
||||
FROM groups
|
||||
WHERE group_id = ?
|
||||
|
||||
@@ -993,11 +994,12 @@ Query:
|
||||
WHERE relay_own_status = ?
|
||||
AND relay_request_failed = 0
|
||||
AND relay_request_err_reason IS NULL
|
||||
ORDER BY group_id ASC
|
||||
ORDER BY relay_request_execute_at ASC
|
||||
LIMIT 1
|
||||
|
||||
Plan:
|
||||
SCAN groups
|
||||
USE TEMP B-TREE FOR ORDER BY
|
||||
|
||||
Query:
|
||||
SELECT i.chat_item_id
|
||||
@@ -1775,7 +1777,9 @@ Query:
|
||||
SET relay_request_inv_id = ?,
|
||||
relay_request_group_link = ?,
|
||||
relay_request_peer_chat_min_version = ?,
|
||||
relay_request_peer_chat_max_version = ?
|
||||
relay_request_peer_chat_max_version = ?,
|
||||
relay_request_delay = ?,
|
||||
relay_request_execute_at = ?
|
||||
WHERE group_id = ?
|
||||
|
||||
Plan:
|
||||
|
||||
@@ -173,7 +173,10 @@ CREATE TABLE groups(
|
||||
root_priv_key BLOB,
|
||||
root_pub_key BLOB,
|
||||
member_priv_key BLOB,
|
||||
public_member_count INTEGER, -- received
|
||||
public_member_count INTEGER,
|
||||
relay_request_retries INTEGER NOT NULL DEFAULT 0,
|
||||
relay_request_delay INTEGER NOT NULL DEFAULT 0,
|
||||
relay_request_execute_at TEXT NOT NULL DEFAULT(datetime('now')), -- received
|
||||
FOREIGN KEY(user_id, local_display_name)
|
||||
REFERENCES display_names(user_id, local_display_name)
|
||||
ON DELETE CASCADE
|
||||
|
||||
@@ -1045,7 +1045,11 @@ data GroupMember = GroupMember
|
||||
data RelayRequestData = RelayRequestData
|
||||
{ relayInvId :: InvitationId,
|
||||
reqGroupLink :: ShortLinkContact,
|
||||
reqChatVRange :: VersionRangeChat
|
||||
reqChatVRange :: VersionRangeChat,
|
||||
reqDelay :: Int64,
|
||||
reqRetries :: Int,
|
||||
reqCreatedAt :: UTCTime,
|
||||
reqExecuteAt :: UTCTime
|
||||
}
|
||||
deriving (Eq, Show)
|
||||
|
||||
|
||||
@@ -235,7 +235,6 @@ chatGroupTests = do
|
||||
it "should respect support preference in channel" testSupportPreferenceChannel
|
||||
-- TODO [relays] add tests for channels
|
||||
-- TODO - tests with delivery loop over members restored after restart
|
||||
-- TODO - delivery in support scopes inside channels
|
||||
-- TODO - connect plans for relay groups
|
||||
-- TODO - cancellation on failure to create relay group (for owner)
|
||||
-- TODO - async retry connecting to relay (for members)
|
||||
|
||||
Reference in New Issue
Block a user