diff --git a/src/Simplex/Chat/Delivery.hs b/src/Simplex/Chat/Delivery.hs index 0071ce19ed..e54360ac0b 100644 --- a/src/Simplex/Chat/Delivery.hs +++ b/src/Simplex/Chat/Delivery.hs @@ -161,16 +161,30 @@ instance TextEncoding DeliveryTaskStatus where data MessageDeliveryJob = MessageDeliveryJob { jobId :: Int64, jobScope :: DeliveryJobScope, - singleSenderGMId_ :: Maybe GroupMemberId, -- Just for single-sender deliveries, Nothing for multi-sender deliveries - -- All distinct senders contributing to this job's body. Used by relay groups - -- to disseminate sender profiles on demand. For single-sender jobs the - -- fast path uses singleSenderGMId_ and this list may be empty. - senderGMIds :: [GroupMemberId], + jobSenders :: JobSenders, body :: ByteString, cursorGMId_ :: Maybe GroupMemberId } deriving (Show) +-- | Senders contributing to a delivery job's body. The two storage columns +-- (single_sender_group_member_id, sender_group_member_ids) are mutually +-- exclusive by construction here: SingleSender persists only the column, +-- MultiSender persists only the list (which may be empty for jobs with no +-- relevant senders, e.g. DJRelayRemoved triggered by the relay leaving). +data JobSenders + = SingleSender GroupMemberId + | MultiSender [GroupMemberId] + deriving (Show) + +-- | The sender member id used by getGroupMembersByCursor to exclude the sender +-- from recipients in the single-sender fast path. Nothing for multi-sender jobs, +-- which means cursor returns all current members (senders themselves included). +singleSenderGMId :: JobSenders -> Maybe GroupMemberId +singleSenderGMId = \case + SingleSender s -> Just s + MultiSender _ -> Nothing + deliveryJobId :: MessageDeliveryJob -> Int64 deliveryJobId = jobId diff --git a/src/Simplex/Chat/Library/Commands.hs b/src/Simplex/Chat/Library/Commands.hs index b37b7099b7..878d9b7fb9 100644 --- a/src/Simplex/Chat/Library/Commands.hs +++ b/src/Simplex/Chat/Library/Commands.hs @@ -57,7 +57,7 @@ import qualified Data.UUID.V4 as V4 import Simplex.Chat.Library.Subscriber import Simplex.Chat.Call import Simplex.Chat.Controller -import Simplex.Chat.Delivery (DeliveryJobScope (..), DeliveryJobSpec (..), DeliveryWorkerScope (..)) +import Simplex.Chat.Delivery (DeliveryJobScope (..), DeliveryJobSpec (..), DeliveryWorkerScope (..), JobSenders (..)) import Simplex.Chat.Files import Simplex.Chat.Markdown import Simplex.Chat.Messages @@ -2942,7 +2942,7 @@ processChatCommand vr nm = \case withFastStore' $ \db -> do deleteGroupDeliveryTasks db gInfo deleteGroupDeliveryJobs db gInfo - createMsgDeliveryJob db gInfo (DJSGroup {jobSpec = DJRelayRemoved}) Nothing [] body + createMsgDeliveryJob db gInfo (DJSGroup {jobSpec = DJRelayRemoved}) (MultiSender []) body lift . void $ getDeliveryJobWorker True (groupId, DWSGroup) pure msg leaveGroupSendMsg user gInfo = do diff --git a/src/Simplex/Chat/Library/Subscriber.hs b/src/Simplex/Chat/Library/Subscriber.hs index bb9f5e62b3..0c478457e8 100644 --- a/src/Simplex/Chat/Library/Subscriber.hs +++ b/src/Simplex/Chat/Library/Subscriber.hs @@ -3578,22 +3578,34 @@ runDeliveryTaskWorker a deliveryKey Worker {doWork} = do | otherwise -> withWorkItems a doWork (withStore' $ \db -> getNextDeliveryTasks db gInfo task) $ \nextTasks -> do let (body, taskIds, largeTaskIds) = batchDeliveryTasks1 vr maxEncodedMsgLength nextTasks - single = singleSenderGMId_ nextTasks - senders = case single of - Just _ -> [] -- fast path: job uses single_sender_group_member_id - Nothing -> distinctSenderGMIds nextTasks + jobSenders + -- Relay groups: tasks in nextTasks may have different senders. + -- Only those accepted by batchDeliveryTasks1 (i.e. in `body`) + -- contribute to dissemination tracking; "large" tasks (DTSError) + -- and overflow-rejected tasks (left DTSNew for a later run) are + -- not in body and their senders must not be marked as already + -- disseminated by this job. + | useRelays' gInfo = + let acceptedTasks = filter ((`elem` taskIds) . deliveryTaskId) (L.toList nextTasks) + in sendersFromTasks acceptedTasks + -- Fully-connected groups: getNextDeliveryTasks filters by + -- sender_group_member_id, so all nextTasks share one sender. Use + -- the outer task's sender even when acceptedTasks is empty (all + -- batched tasks rejected as "large"), preserving the fast-path + -- contract that the worker has a known sender. + | otherwise = SingleSender (taskSenderGMId task) withStore' $ \db -> do - createMsgDeliveryJob db gInfo jobScope single senders body + createMsgDeliveryJob db gInfo jobScope jobSenders body forM_ taskIds $ \taskId -> updateDeliveryTaskStatus db taskId DTSProcessed forM_ largeTaskIds $ \taskId -> setDeliveryTaskErrStatus db taskId "large" lift . void $ getDeliveryJobWorker True deliveryKey where - singleSenderGMId_ :: NonEmpty MessageDeliveryTask -> Maybe GroupMemberId - singleSenderGMId_ (MessageDeliveryTask {senderGMId = senderGMId'} :| ts) - | all (\MessageDeliveryTask {senderGMId} -> senderGMId == senderGMId') ts = Just senderGMId' - | otherwise = Nothing - distinctSenderGMIds :: NonEmpty MessageDeliveryTask -> [GroupMemberId] - distinctSenderGMIds = nub . map (\MessageDeliveryTask {senderGMId} -> senderGMId) . L.toList + taskSenderGMId :: MessageDeliveryTask -> GroupMemberId + taskSenderGMId MessageDeliveryTask {senderGMId} = senderGMId + sendersFromTasks :: [MessageDeliveryTask] -> JobSenders + sendersFromTasks ts = case nub (map taskSenderGMId ts) of + [s] -> SingleSender s + ss -> MultiSender ss -- DJRelayRemoved is allowed when RSInactive - it forwards XGrpMemDel about relay's own deletion DJRelayRemoved | workerScope /= DWSGroup -> @@ -3603,7 +3615,7 @@ runDeliveryTaskWorker a deliveryKey Worker {doWork} = do fwd = GrpMsgForward {fwdSender, fwdBrokerTs} body = encodeBinaryBatch [encodeFwdElement fwd verifiedMsg] withStore' $ \db -> do - createMsgDeliveryJob db gInfo jobScope (Just senderGMId) [] body + createMsgDeliveryJob db gInfo jobScope (SingleSender senderGMId) body updateDeliveryTaskStatus db (deliveryTaskId task) DTSProcessed lift . void $ getDeliveryJobWorker True deliveryKey @@ -3663,7 +3675,13 @@ runDeliveryJobWorker a deliveryKey Worker {doWork} = do deleteGroupConnections user gInfo True withStore' $ \db -> updateDeliveryJobStatus db jobId DJSComplete where - MessageDeliveryJob {jobId, jobScope, singleSenderGMId_, senderGMIds, body, cursorGMId_ = startingCursor} = job + MessageDeliveryJob {jobId, jobScope, jobSenders, body, cursorGMId_ = startingCursor} = job + -- Derived once for use by both the channel and fully-connected branches + -- and by the cursor query's sender-exclusion filter. + singleSenderGMId_ = singleSenderGMId jobSenders + allSenderGMIds = case jobSenders of + SingleSender s -> [s] + MultiSender ss -> ss sendBodyToMembers :: CM () sendBodyToMembers -- channel @@ -3671,15 +3689,15 @@ runDeliveryJobWorker a deliveryKey Worker {doWork} = do -- there's no member review in channels, so job spec includePending is ignored DJSGroup {} -> do bucketSize <- asks $ deliveryBucketSize . config - -- distinct senders contributing to this body — fast path: single sender from - -- the job column; slow path: decoded blob persisted at job creation time. - let allSenderGMIds = case singleSenderGMId_ of - Just s -> [s] - Nothing -> senderGMIds - senderProfiles <- forM allSenderGMIds $ \sId -> do - sender <- withStore $ \db -> getGroupMemberById db vr user sId - vec <- withStore' $ \db -> getSentProfileVector db sId - pure (sender, vec) + -- (sender, vector) snapshot read in one transaction so partition runs + -- against a consistent point-in-time view. Without this, an xInfoMember + -- racing the worker could yield (new profile, old vector) or (old + -- profile, cleared vector), making partitioning meaningless. + senderProfiles <- withStore $ \db -> + forM allSenderGMIds $ \sId -> do + sender <- getGroupMemberById db vr user sId + vec <- liftIO $ getSentProfileVector db sId + pure (sender, vec) -- extBody captures each sender's profile at job-start. If the profile -- is updated mid-job (xInfoMember clears the vector and queues XInfo -- as its own job), recipients of this job receive the pre-update @@ -3733,8 +3751,8 @@ runDeliveryJobWorker a deliveryKey Worker {doWork} = do -- fully connected group | otherwise = case singleSenderGMId_ of Nothing -> throwChatError $ CEInternalError "delivery job worker: singleSenderGMId is required when not using relays" - Just singleSenderGMId -> do - sender <- withStore $ \db -> getGroupMemberById db vr user singleSenderGMId + Just sId -> do + sender <- withStore $ \db -> getGroupMemberById db vr user sId ms <- buildMemberList sender unless (null ms) $ deliver body ms where diff --git a/src/Simplex/Chat/Store/Delivery.hs b/src/Simplex/Chat/Store/Delivery.hs index ede680b411..3cc1f210ff 100644 --- a/src/Simplex/Chat/Store/Delivery.hs +++ b/src/Simplex/Chat/Store/Delivery.hs @@ -247,8 +247,8 @@ deleteDoneDeliveryTasks db createdAtCutoff = do |] (createdAtCutoff, DTSProcessed, DTSError) -createMsgDeliveryJob :: DB.Connection -> GroupInfo -> DeliveryJobScope -> Maybe GroupMemberId -> [GroupMemberId] -> ByteString -> IO () -createMsgDeliveryJob db gInfo jobScope singleSenderGMId_ senderGMIds body = do +createMsgDeliveryJob :: DB.Connection -> GroupInfo -> DeliveryJobScope -> JobSenders -> ByteString -> IO () +createMsgDeliveryJob db gInfo jobScope jobSenders body = do currentTs <- getCurrentTime DB.execute db @@ -259,16 +259,17 @@ createMsgDeliveryJob db gInfo jobScope singleSenderGMId_ senderGMIds body = do single_sender_group_member_id, sender_group_member_ids, body, job_status, created_at, updated_at ) VALUES (?,?,?,?,?,?,?,?,?,?,?) |] - ((Only groupId) :. jobScopeRow_ jobScope :. (singleSenderGMId_, encodedSenderGMIds, Binary body, DJSPending, currentTs, currentTs)) + ((Only groupId) :. jobScopeRow_ jobScope :. (singleColumn, multiColumn, Binary body, DJSPending, currentTs, currentTs)) where GroupInfo {groupId} = gInfo - -- For single-sender jobs the sender-list column is unused at execution time - -- (the fast path reads single_sender_group_member_id instead). Skip persisting - -- an empty blob for the multi-sender list to keep storage minimal. - encodedSenderGMIds :: Maybe (Binary ByteString) - encodedSenderGMIds = case senderGMIds of - [] -> Nothing - _ -> Just $ Binary $ smpEncodeList senderGMIds + -- The two columns are mutually exclusive by construction: SingleSender + -- writes only single_sender_group_member_id, MultiSender writes only + -- sender_group_member_ids (and the latter as NULL when the list is empty, + -- e.g. DJRelayRemoved with no senders to disseminate). + (singleColumn, multiColumn) = case jobSenders of + SingleSender s -> (Just s, Nothing) + MultiSender [] -> (Nothing, Nothing) + MultiSender ss -> (Nothing, Just $ Binary $ smpEncodeList ss) getPendingDeliveryJobScopes :: DB.Connection -> IO [DeliveryWorkerKey] getPendingDeliveryJobScopes db = @@ -318,15 +319,20 @@ getNextDeliveryJob db deliveryKey = do (Only jobId) where toDeliveryJob :: MessageDeliveryJobRow -> Either StoreError MessageDeliveryJob - toDeliveryJob ((Only jobId') :. jobScopeRow :. (singleSenderGMId_, senderGMIds_, Binary body, cursorGMId_)) = do + toDeliveryJob ((Only jobId') :. jobScopeRow :. (singleColumn, multiColumn, Binary body, cursorGMId_)) = do jobScope <- maybe (Left $ SEInvalidDeliveryJob jobId') Right $ toJobScope_ jobScopeRow - -- sender_group_member_ids is written by smpEncodeList; a parse failure means - -- on-disk corruption or a format change. Surface as job error rather than silently - -- degrade to the pre-feature "unknown member" behavior. - senderGMIds <- case senderGMIds_ of - Nothing -> Right [] - Just (Binary bs) -> first (const $ SEInvalidDeliveryJob jobId') $ parseAll smpListP bs - Right $ MessageDeliveryJob {jobId = jobId', jobScope, singleSenderGMId_, senderGMIds, body, cursorGMId_} + jobSenders <- case (singleColumn, multiColumn) of + (Just s, Nothing) -> Right $ SingleSender s + (Nothing, Nothing) -> Right $ MultiSender [] + -- sender_group_member_ids is written by smpEncodeList; a parse failure means + -- on-disk corruption or a format change. Surface as job error rather than + -- silently degrading to the pre-feature "unknown member" behavior. + (Nothing, Just (Binary bs)) -> + first (const $ SEInvalidDeliveryJob jobId') $ MultiSender <$> parseAll smpListP bs + -- Both columns set is a writer-side bug; createMsgDeliveryJob enforces + -- mutual exclusion. Surface rather than silently pick one. + (Just _, Just _) -> Left $ SEInvalidDeliveryJob jobId' + Right $ MessageDeliveryJob {jobId = jobId', jobScope, jobSenders, body, cursorGMId_} markJobFailed :: Int64 -> IO () markJobFailed jobId = DB.execute db "UPDATE delivery_jobs SET failed = 1 where delivery_job_id = ?" (Only jobId) diff --git a/src/Simplex/Chat/Store/Postgres/Migrations/M20260513_sent_profile_vector.hs b/src/Simplex/Chat/Store/Postgres/Migrations/M20260513_sent_profile_vector.hs index 9f324ecd29..b93ac3aa92 100644 --- a/src/Simplex/Chat/Store/Postgres/Migrations/M20260513_sent_profile_vector.hs +++ b/src/Simplex/Chat/Store/Postgres/Migrations/M20260513_sent_profile_vector.hs @@ -1,6 +1,10 @@ {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE QuasiQuotes #-} +-- Member profile dissemination in relay-mediated groups (task 001): +-- group_members.sent_profile_vector — per-recipient byte vector of "did this member's profile reach you?" +-- delivery_jobs.sender_group_member_ids — multi-sender batch: senders whose body fragments are in this job +-- Both columns serve the same feature; the migration name only references the primary column. module Simplex.Chat.Store.Postgres.Migrations.M20260513_sent_profile_vector where import Data.Text (Text) diff --git a/src/Simplex/Chat/Store/SQLite/Migrations/M20260513_sent_profile_vector.hs b/src/Simplex/Chat/Store/SQLite/Migrations/M20260513_sent_profile_vector.hs index c49d5177e4..ddabc1a318 100644 --- a/src/Simplex/Chat/Store/SQLite/Migrations/M20260513_sent_profile_vector.hs +++ b/src/Simplex/Chat/Store/SQLite/Migrations/M20260513_sent_profile_vector.hs @@ -1,5 +1,9 @@ {-# LANGUAGE QuasiQuotes #-} +-- Member profile dissemination in relay-mediated groups (task 001): +-- group_members.sent_profile_vector — per-recipient byte vector of "did this member's profile reach you?" +-- delivery_jobs.sender_group_member_ids — multi-sender batch: senders whose body fragments are in this job +-- Both columns serve the same feature; the migration name only references the primary column. module Simplex.Chat.Store.SQLite.Migrations.M20260513_sent_profile_vector where import Database.SQLite.Simple (Query)