mirror of
https://github.com/simplex-chat/simplex-chat.git
synced 2026-07-02 13:31:51 +00:00
wip
This commit is contained in:
@@ -2551,9 +2551,11 @@ processAgentMessageConn vr user@User {userId} corrId agentConnId agentMessage =
|
||||
|
||||
xInfoMember :: GroupInfo -> GroupMember -> Profile -> RcvMessage -> UTCTime -> CM (Maybe DeliveryJobScope)
|
||||
xInfoMember gInfo m p' msg brokerTs = do
|
||||
void $ processMemberProfileUpdate gInfo m p' (Just (msg, brokerTs))
|
||||
(_, profileChanged) <- processMemberProfileUpdate gInfo m p' (Just (msg, brokerTs))
|
||||
-- Relay-side: re-disseminate this member's profile on next forwarded message.
|
||||
when (useRelays' gInfo && isRelay (membership gInfo)) $
|
||||
-- Gate on profileChanged so a benign re-broadcast (identical profile) doesn't
|
||||
-- wipe the vector and force a channel-wide re-prepend on the next forward.
|
||||
when (profileChanged && useRelays' gInfo && isRelay (membership gInfo)) $
|
||||
withStore' $ \db -> clearSentProfileVector db (groupMemberId' m)
|
||||
pure $ memberEventDeliveryScope m
|
||||
|
||||
@@ -2562,7 +2564,7 @@ processAgentMessageConn vr user@User {userId} corrId agentConnId agentMessage =
|
||||
xGrpLinkMemReceived <- withStore $ \db -> getXGrpLinkMemReceived db groupMemberId
|
||||
if (viaGroupLink || isJust businessChat) && isNothing (memberContactId m) && memberCategory == GCHostMember && not xGrpLinkMemReceived
|
||||
then do
|
||||
m' <- processMemberProfileUpdate gInfo m p' Nothing
|
||||
(m', _) <- processMemberProfileUpdate gInfo m p' Nothing
|
||||
withStore' $ \db -> setXGrpLinkMemReceived db groupMemberId True
|
||||
let connectedIncognito = memberIncognito membership
|
||||
probeMatchingMemberContact m' connectedIncognito
|
||||
@@ -2626,7 +2628,11 @@ processAgentMessageConn vr user@User {userId} corrId agentConnId agentMessage =
|
||||
where
|
||||
expectHistory = groupFeatureAllowed SGFHistory gInfo && m `supportsVersion` groupHistoryIncludeWelcomeVersion
|
||||
|
||||
processMemberProfileUpdate :: GroupInfo -> GroupMember -> Profile -> Maybe (RcvMessage, UTCTime) -> CM GroupMember
|
||||
-- Returns (resulting member, profileChanged). profileChanged is True only when
|
||||
-- the stored profile was actually replaced — callers that act on a change
|
||||
-- (e.g. xInfoMember clearing the sent-profile vector) gate on the bool to
|
||||
-- avoid wasted work on replays / bounces / rejected updates.
|
||||
processMemberProfileUpdate :: GroupInfo -> GroupMember -> Profile -> Maybe (RcvMessage, UTCTime) -> CM (GroupMember, Bool)
|
||||
processMemberProfileUpdate gInfo m@GroupMember {memberProfile = p, memberContactId} p' msgTs_
|
||||
| redactedMemberProfile allowSimplexLinks (fromLocalProfile p) /= redactedMemberProfile allowSimplexLinks p' = do
|
||||
updateBusinessChatProfile gInfo
|
||||
@@ -2636,7 +2642,7 @@ processAgentMessageConn vr user@User {userId} corrId agentConnId agentMessage =
|
||||
unless (muteEventInChannel gInfo m') $ do
|
||||
forM_ msgTs_ $ createProfileUpdatedItem m'
|
||||
toView $ CEvtGroupMemberUpdated user gInfo m m'
|
||||
pure m'
|
||||
pure (m', True)
|
||||
Just mContactId -> do
|
||||
mCt <- withStore $ \db -> getContact db vr user mContactId
|
||||
if canUpdateProfile mCt
|
||||
@@ -2646,8 +2652,8 @@ processAgentMessageConn vr user@User {userId} corrId agentConnId agentMessage =
|
||||
forM_ msgTs_ $ createProfileUpdatedItem m'
|
||||
toView $ CEvtGroupMemberUpdated user gInfo m m'
|
||||
toView $ CEvtContactUpdated user mCt ct'
|
||||
pure m'
|
||||
else pure m
|
||||
pure (m', True)
|
||||
else pure (m, False) -- profile update rejected (active contact connection); no change
|
||||
where
|
||||
canUpdateProfile ct
|
||||
| not (contactActive ct) = True
|
||||
@@ -2655,7 +2661,7 @@ processAgentMessageConn vr user@User {userId} corrId agentConnId agentMessage =
|
||||
Nothing -> True
|
||||
Just conn -> not (connReady conn) || (authErrCounter conn >= 1)
|
||||
| otherwise =
|
||||
pure m
|
||||
pure (m, False)
|
||||
where
|
||||
allowSimplexLinks = groupFeatureMemberAllowed SGFSimplexLinks m gInfo
|
||||
updateBusinessChatProfile g@GroupInfo {businessChat} = case businessChat of
|
||||
@@ -3693,11 +3699,23 @@ runDeliveryJobWorker a deliveryKey Worker {doWork} = do
|
||||
-- against a consistent point-in-time view. Without this, an xInfoMember
|
||||
-- racing the worker could yield (new profile, old vector) or (old
|
||||
-- profile, cleared vector), making partitioning meaningless.
|
||||
senderProfiles <- withStore $ \db ->
|
||||
forM allSenderGMIds $ \sId -> do
|
||||
sender <- getGroupMemberById db vr user sId
|
||||
vec <- liftIO $ getSentProfileVector db sId
|
||||
pure (sender, vec)
|
||||
--
|
||||
-- Missing senders (deleted between job creation and execution, e.g. via
|
||||
-- a concurrent XGrpMemDel) are skipped rather than aborting the whole
|
||||
-- job: their profile cannot be disseminated, but the rest of the batch
|
||||
-- ships normally. Recipients see "unknown member" for the missing
|
||||
-- sender's forwarded content only in this batch.
|
||||
senderProfiles <- withStore' $ \db ->
|
||||
fmap catMaybes . forM allSenderGMIds $ \sId -> do
|
||||
sender_ <- runExceptT $ getGroupMemberById db vr user sId
|
||||
case sender_ of
|
||||
Right sender -> do
|
||||
vec <- getSentProfileVector db sId
|
||||
pure $ Just (sender, vec)
|
||||
Left _ -> pure Nothing
|
||||
let missingSenders = length allSenderGMIds - length senderProfiles
|
||||
when (missingSenders > 0) $
|
||||
logInfo $ "delivery job " <> tshow jobId <> ": " <> tshow missingSenders <> " senders missing; skipping their profile prepend"
|
||||
-- extBody captures each sender's profile at job-start. If the profile
|
||||
-- is updated mid-job (xInfoMember clears the vector and queues XInfo
|
||||
-- as its own job), recipients of this job receive the pre-update
|
||||
@@ -3710,15 +3728,32 @@ runDeliveryJobWorker a deliveryKey Worker {doWork} = do
|
||||
fwdBrokerTs <- liftIO getCurrentTime
|
||||
let profileElements = map (\(sender, _) -> encodeMemberProfileElement vr gInfo sender fwdBrokerTs) senderProfiles
|
||||
pure $ foldr prependBatchElement body profileElements
|
||||
sendLoop bucketSize startingCursor senderProfiles extBody
|
||||
-- If prepending pushed extBody past the agent's per-message ceiling,
|
||||
-- the agent would reject delivery to the "needs profiles" recipients
|
||||
-- AND we'd still mark their vectors as if delivery had succeeded —
|
||||
-- silently locking them out of future dissemination for these senders.
|
||||
-- Fall back: deliver bare body to everyone in this job, skip the marks,
|
||||
-- and let the next forwarded message (likely smaller, since body filled
|
||||
-- the budget here) re-attempt the prepend.
|
||||
let extBodyOverflows = B.length extBody > maxEncodedMsgLength
|
||||
prependProfiles = not (null senderProfiles) && not extBodyOverflows
|
||||
when extBodyOverflows $
|
||||
logInfo $
|
||||
"delivery job "
|
||||
<> tshow jobId
|
||||
<> ": extBody "
|
||||
<> tshow (B.length extBody)
|
||||
<> " bytes exceeds maxEncodedMsgLength "
|
||||
<> tshow maxEncodedMsgLength
|
||||
<> "; falling back to bare body for all recipients in this job, skipping vector marks"
|
||||
sendLoop bucketSize startingCursor senderProfiles extBody prependProfiles
|
||||
where
|
||||
sendLoop :: Int -> Maybe GroupMemberId -> [(GroupMember, ByteString)] -> ByteString -> CM ()
|
||||
sendLoop bucketSize cursorGMId_ senderProfiles extBody = do
|
||||
sendLoop :: Int -> Maybe GroupMemberId -> [(GroupMember, ByteString)] -> ByteString -> Bool -> CM ()
|
||||
sendLoop bucketSize cursorGMId_ senderProfiles extBody prependProfiles = do
|
||||
mems <- withStore' $ \db -> getGroupMembersByCursor db vr user gInfo cursorGMId_ singleSenderGMId_ bucketSize
|
||||
unless (null mems) $ do
|
||||
if null senderProfiles
|
||||
then deliver body mems
|
||||
else do
|
||||
if prependProfiles
|
||||
then do
|
||||
let knowsAll m = all (\(_, vec) -> isProfileSentTo vec (indexInGroup m)) senderProfiles
|
||||
(hasAllProfiles, needsProfiles) = partition knowsAll mems
|
||||
unless (null needsProfiles) $ deliver extBody needsProfiles
|
||||
@@ -3730,9 +3765,14 @@ runDeliveryJobWorker a deliveryKey Worker {doWork} = do
|
||||
withStore' $ \db ->
|
||||
forM_ senderProfiles $ \(sender, _) ->
|
||||
markProfilesSentToMembers db (groupMemberId' sender) readyIdxs
|
||||
else
|
||||
-- No dissemination this job: either no senders to disseminate,
|
||||
-- or the prepended bytes would exceed the agent's per-message
|
||||
-- ceiling. Vectors stay unchanged so the next forward retries.
|
||||
deliver body mems
|
||||
let cursorGMId' = groupMemberId' $ last mems
|
||||
withStore' $ \db -> updateDeliveryJobCursor db jobId cursorGMId'
|
||||
unless (length mems < bucketSize) $ sendLoop bucketSize (Just cursorGMId') senderProfiles extBody
|
||||
unless (length mems < bucketSize) $ sendLoop bucketSize (Just cursorGMId') senderProfiles extBody prependProfiles
|
||||
DJSMemberSupport scopeGMId -> do
|
||||
-- for member support scope we just load all recipients in one go, without cursor
|
||||
modMs <- withStore' $ \db -> getGroupModerators db vr user gInfo
|
||||
|
||||
@@ -250,6 +250,7 @@ chatGroupTests = do
|
||||
it "sender should deduplicate their own messages" testChannelsSenderDeduplicateOwn
|
||||
it "late joiner (no prior history) learns sender on first forward" testChannelLateJoinerNoHistoryReceivesProfile
|
||||
it "multi senders disseminate independently" testChannelMultiSendersIndependentDissemination
|
||||
it "extBody overflow falls back to bare body and skips vector mark" testChannelExtBodyOverflowSkipsVectorMark
|
||||
describe "multiple relays" $ do
|
||||
it "2 relays: should deliver messages to members" testChannels2RelaysDeliver
|
||||
it "should share same incognito profile with all relays" testChannels2RelaysIncognito
|
||||
@@ -8842,6 +8843,62 @@ testChannelLateJoinerNoHistoryReceivesProfile ps =
|
||||
alice <# "#team cath> hi again [>>]"
|
||||
dan <# "#team cath> hi again [>>]"
|
||||
|
||||
-- Regression test for: prepended XGrpMemNew can push extBody past maxEncodedMsgLength,
|
||||
-- causing the agent to reject delivery to "needs profiles" recipients while the worker
|
||||
-- still marks their sent_profile_vector — silently locking them out of future
|
||||
-- dissemination for that sender. With the fix, the worker falls back to delivering the
|
||||
-- bare body (no prepend) and skips the vector mark, so the next forward retries.
|
||||
testChannelExtBodyOverflowSkipsVectorMark :: HasCallStack => TestParams -> IO ()
|
||||
testChannelExtBodyOverflowSkipsVectorMark ps =
|
||||
withNewTestChat ps "alice" aliceProfile $ \alice -> do
|
||||
withNewTestChatOpts ps relayTestOpts "bob" bobProfile $ \bob -> do
|
||||
withNewTestChat ps "cath" cathProfile $ \cath -> do
|
||||
withNewTestChat ps "dan" danProfile $ \dan -> do
|
||||
(shortLink, fullLink) <- prepareChannel1Relay "team" alice bob
|
||||
memberJoinChannel "team" [bob] [alice] shortLink fullLink cath
|
||||
memberJoinChannel "team" [bob] [alice] shortLink fullLink dan
|
||||
|
||||
-- Inflate cath's profile image directly on the relay's database so the
|
||||
-- prepended XGrpMemNew element for her overflows maxEncodedMsgLength even
|
||||
-- with a tiny forwarded body. This bypasses the chat client's profile-size
|
||||
-- validation; we're exercising the relay's overflow path, not the validator.
|
||||
let bigImage = T.pack ("data:image/png;base64," <> replicate 16000 'A')
|
||||
withCCTransaction bob $ \db ->
|
||||
DB.execute
|
||||
db
|
||||
"UPDATE contact_profiles SET image = ? WHERE display_name = ?"
|
||||
(bigImage, "cath" :: T.Text)
|
||||
|
||||
-- cath sends a small message. The relay loads cath's (now huge) profile from
|
||||
-- its DB and tries to prepend it; extBody exceeds maxEncodedMsgLength. The
|
||||
-- worker falls back to delivering the bare body to all recipients in this
|
||||
-- job AND skips the vector marks. dan learns nothing about cath from this
|
||||
-- batch — sees the familiar "unknown member" line — but recovery is not
|
||||
-- locked out for future forwards.
|
||||
cath #> "#team hi"
|
||||
bob <# "#team cath> hi"
|
||||
-- alice already knows cath via the join-time XGrpMemNew; her view is unaffected.
|
||||
alice <# "#team cath> hi [>>]"
|
||||
-- dan never received an XGrpMemNew for cath (no prepend in the overflowing batch).
|
||||
dan <## "#team: bob forwarded a message from an unknown member, creating unknown member record cath"
|
||||
dan <# "#team cath> hi [>>]"
|
||||
|
||||
-- Crucial regression assertion: cath.sent_profile_vector on the relay (bob)
|
||||
-- is empty. Without the fix, the worker would have marked dan.idx (and
|
||||
-- alice.idx) after the agent-rejected delivery, locking dan out of future
|
||||
-- dissemination — the bug the fix closes.
|
||||
checkSentProfileVectorEmpty bob "cath"
|
||||
where
|
||||
checkSentProfileVectorEmpty :: HasCallStack => TestCC -> T.Text -> IO ()
|
||||
checkSentProfileVectorEmpty cc senderName = do
|
||||
lens <- withCCTransaction cc $ \db ->
|
||||
DB.query
|
||||
db
|
||||
"SELECT length(sent_profile_vector) FROM group_members WHERE local_display_name = ?"
|
||||
(Only senderName) ::
|
||||
IO [Only Int]
|
||||
map fromOnly lens `shouldBe` [0]
|
||||
|
||||
testChannelMultiSendersIndependentDissemination :: HasCallStack => TestParams -> IO ()
|
||||
testChannelMultiSendersIndependentDissemination ps =
|
||||
withNewTestChat ps "alice" aliceProfile $ \alice -> do
|
||||
|
||||
Reference in New Issue
Block a user