From bb37fe5ee3e3dc01602e50d9aa7a1db59f863b71 Mon Sep 17 00:00:00 2001 From: spaced4ndy <8711996+spaced4ndy@users.noreply.github.com> Date: Fri, 15 May 2026 17:27:32 +0400 Subject: [PATCH] wip --- src/Simplex/Chat/Library/Subscriber.hs | 80 +++++++++++++++++++------- tests/ChatTests/Groups.hs | 57 ++++++++++++++++++ 2 files changed, 117 insertions(+), 20 deletions(-) diff --git a/src/Simplex/Chat/Library/Subscriber.hs b/src/Simplex/Chat/Library/Subscriber.hs index 0c478457e8..fa31854eb9 100644 --- a/src/Simplex/Chat/Library/Subscriber.hs +++ b/src/Simplex/Chat/Library/Subscriber.hs @@ -2551,9 +2551,11 @@ processAgentMessageConn vr user@User {userId} corrId agentConnId agentMessage = xInfoMember :: GroupInfo -> GroupMember -> Profile -> RcvMessage -> UTCTime -> CM (Maybe DeliveryJobScope) xInfoMember gInfo m p' msg brokerTs = do - void $ processMemberProfileUpdate gInfo m p' (Just (msg, brokerTs)) + (_, profileChanged) <- processMemberProfileUpdate gInfo m p' (Just (msg, brokerTs)) -- Relay-side: re-disseminate this member's profile on next forwarded message. - when (useRelays' gInfo && isRelay (membership gInfo)) $ + -- Gate on profileChanged so a benign re-broadcast (identical profile) doesn't + -- wipe the vector and force a channel-wide re-prepend on the next forward. + when (profileChanged && useRelays' gInfo && isRelay (membership gInfo)) $ withStore' $ \db -> clearSentProfileVector db (groupMemberId' m) pure $ memberEventDeliveryScope m @@ -2562,7 +2564,7 @@ processAgentMessageConn vr user@User {userId} corrId agentConnId agentMessage = xGrpLinkMemReceived <- withStore $ \db -> getXGrpLinkMemReceived db groupMemberId if (viaGroupLink || isJust businessChat) && isNothing (memberContactId m) && memberCategory == GCHostMember && not xGrpLinkMemReceived then do - m' <- processMemberProfileUpdate gInfo m p' Nothing + (m', _) <- processMemberProfileUpdate gInfo m p' Nothing withStore' $ \db -> setXGrpLinkMemReceived db groupMemberId True let connectedIncognito = memberIncognito membership probeMatchingMemberContact m' connectedIncognito @@ -2626,7 +2628,11 @@ processAgentMessageConn vr user@User {userId} corrId agentConnId agentMessage = where expectHistory = groupFeatureAllowed SGFHistory gInfo && m `supportsVersion` groupHistoryIncludeWelcomeVersion - processMemberProfileUpdate :: GroupInfo -> GroupMember -> Profile -> Maybe (RcvMessage, UTCTime) -> CM GroupMember + -- Returns (resulting member, profileChanged). profileChanged is True only when + -- the stored profile was actually replaced — callers that act on a change + -- (e.g. xInfoMember clearing the sent-profile vector) gate on the bool to + -- avoid wasted work on replays / bounces / rejected updates. + processMemberProfileUpdate :: GroupInfo -> GroupMember -> Profile -> Maybe (RcvMessage, UTCTime) -> CM (GroupMember, Bool) processMemberProfileUpdate gInfo m@GroupMember {memberProfile = p, memberContactId} p' msgTs_ | redactedMemberProfile allowSimplexLinks (fromLocalProfile p) /= redactedMemberProfile allowSimplexLinks p' = do updateBusinessChatProfile gInfo @@ -2636,7 +2642,7 @@ processAgentMessageConn vr user@User {userId} corrId agentConnId agentMessage = unless (muteEventInChannel gInfo m') $ do forM_ msgTs_ $ createProfileUpdatedItem m' toView $ CEvtGroupMemberUpdated user gInfo m m' - pure m' + pure (m', True) Just mContactId -> do mCt <- withStore $ \db -> getContact db vr user mContactId if canUpdateProfile mCt @@ -2646,8 +2652,8 @@ processAgentMessageConn vr user@User {userId} corrId agentConnId agentMessage = forM_ msgTs_ $ createProfileUpdatedItem m' toView $ CEvtGroupMemberUpdated user gInfo m m' toView $ CEvtContactUpdated user mCt ct' - pure m' - else pure m + pure (m', True) + else pure (m, False) -- profile update rejected (active contact connection); no change where canUpdateProfile ct | not (contactActive ct) = True @@ -2655,7 +2661,7 @@ processAgentMessageConn vr user@User {userId} corrId agentConnId agentMessage = Nothing -> True Just conn -> not (connReady conn) || (authErrCounter conn >= 1) | otherwise = - pure m + pure (m, False) where allowSimplexLinks = groupFeatureMemberAllowed SGFSimplexLinks m gInfo updateBusinessChatProfile g@GroupInfo {businessChat} = case businessChat of @@ -3693,11 +3699,23 @@ runDeliveryJobWorker a deliveryKey Worker {doWork} = do -- against a consistent point-in-time view. Without this, an xInfoMember -- racing the worker could yield (new profile, old vector) or (old -- profile, cleared vector), making partitioning meaningless. - senderProfiles <- withStore $ \db -> - forM allSenderGMIds $ \sId -> do - sender <- getGroupMemberById db vr user sId - vec <- liftIO $ getSentProfileVector db sId - pure (sender, vec) + -- + -- Missing senders (deleted between job creation and execution, e.g. via + -- a concurrent XGrpMemDel) are skipped rather than aborting the whole + -- job: their profile cannot be disseminated, but the rest of the batch + -- ships normally. Recipients see "unknown member" for the missing + -- sender's forwarded content only in this batch. + senderProfiles <- withStore' $ \db -> + fmap catMaybes . forM allSenderGMIds $ \sId -> do + sender_ <- runExceptT $ getGroupMemberById db vr user sId + case sender_ of + Right sender -> do + vec <- getSentProfileVector db sId + pure $ Just (sender, vec) + Left _ -> pure Nothing + let missingSenders = length allSenderGMIds - length senderProfiles + when (missingSenders > 0) $ + logInfo $ "delivery job " <> tshow jobId <> ": " <> tshow missingSenders <> " senders missing; skipping their profile prepend" -- extBody captures each sender's profile at job-start. If the profile -- is updated mid-job (xInfoMember clears the vector and queues XInfo -- as its own job), recipients of this job receive the pre-update @@ -3710,15 +3728,32 @@ runDeliveryJobWorker a deliveryKey Worker {doWork} = do fwdBrokerTs <- liftIO getCurrentTime let profileElements = map (\(sender, _) -> encodeMemberProfileElement vr gInfo sender fwdBrokerTs) senderProfiles pure $ foldr prependBatchElement body profileElements - sendLoop bucketSize startingCursor senderProfiles extBody + -- If prepending pushed extBody past the agent's per-message ceiling, + -- the agent would reject delivery to the "needs profiles" recipients + -- AND we'd still mark their vectors as if delivery had succeeded — + -- silently locking them out of future dissemination for these senders. + -- Fall back: deliver bare body to everyone in this job, skip the marks, + -- and let the next forwarded message (likely smaller, since body filled + -- the budget here) re-attempt the prepend. + let extBodyOverflows = B.length extBody > maxEncodedMsgLength + prependProfiles = not (null senderProfiles) && not extBodyOverflows + when extBodyOverflows $ + logInfo $ + "delivery job " + <> tshow jobId + <> ": extBody " + <> tshow (B.length extBody) + <> " bytes exceeds maxEncodedMsgLength " + <> tshow maxEncodedMsgLength + <> "; falling back to bare body for all recipients in this job, skipping vector marks" + sendLoop bucketSize startingCursor senderProfiles extBody prependProfiles where - sendLoop :: Int -> Maybe GroupMemberId -> [(GroupMember, ByteString)] -> ByteString -> CM () - sendLoop bucketSize cursorGMId_ senderProfiles extBody = do + sendLoop :: Int -> Maybe GroupMemberId -> [(GroupMember, ByteString)] -> ByteString -> Bool -> CM () + sendLoop bucketSize cursorGMId_ senderProfiles extBody prependProfiles = do mems <- withStore' $ \db -> getGroupMembersByCursor db vr user gInfo cursorGMId_ singleSenderGMId_ bucketSize unless (null mems) $ do - if null senderProfiles - then deliver body mems - else do + if prependProfiles + then do let knowsAll m = all (\(_, vec) -> isProfileSentTo vec (indexInGroup m)) senderProfiles (hasAllProfiles, needsProfiles) = partition knowsAll mems unless (null needsProfiles) $ deliver extBody needsProfiles @@ -3730,9 +3765,14 @@ runDeliveryJobWorker a deliveryKey Worker {doWork} = do withStore' $ \db -> forM_ senderProfiles $ \(sender, _) -> markProfilesSentToMembers db (groupMemberId' sender) readyIdxs + else + -- No dissemination this job: either no senders to disseminate, + -- or the prepended bytes would exceed the agent's per-message + -- ceiling. Vectors stay unchanged so the next forward retries. + deliver body mems let cursorGMId' = groupMemberId' $ last mems withStore' $ \db -> updateDeliveryJobCursor db jobId cursorGMId' - unless (length mems < bucketSize) $ sendLoop bucketSize (Just cursorGMId') senderProfiles extBody + unless (length mems < bucketSize) $ sendLoop bucketSize (Just cursorGMId') senderProfiles extBody prependProfiles DJSMemberSupport scopeGMId -> do -- for member support scope we just load all recipients in one go, without cursor modMs <- withStore' $ \db -> getGroupModerators db vr user gInfo diff --git a/tests/ChatTests/Groups.hs b/tests/ChatTests/Groups.hs index b4dac44f38..c8b75ed1ef 100644 --- a/tests/ChatTests/Groups.hs +++ b/tests/ChatTests/Groups.hs @@ -250,6 +250,7 @@ chatGroupTests = do it "sender should deduplicate their own messages" testChannelsSenderDeduplicateOwn it "late joiner (no prior history) learns sender on first forward" testChannelLateJoinerNoHistoryReceivesProfile it "multi senders disseminate independently" testChannelMultiSendersIndependentDissemination + it "extBody overflow falls back to bare body and skips vector mark" testChannelExtBodyOverflowSkipsVectorMark describe "multiple relays" $ do it "2 relays: should deliver messages to members" testChannels2RelaysDeliver it "should share same incognito profile with all relays" testChannels2RelaysIncognito @@ -8842,6 +8843,62 @@ testChannelLateJoinerNoHistoryReceivesProfile ps = alice <# "#team cath> hi again [>>]" dan <# "#team cath> hi again [>>]" +-- Regression test for: prepended XGrpMemNew can push extBody past maxEncodedMsgLength, +-- causing the agent to reject delivery to "needs profiles" recipients while the worker +-- still marks their sent_profile_vector — silently locking them out of future +-- dissemination for that sender. With the fix, the worker falls back to delivering the +-- bare body (no prepend) and skips the vector mark, so the next forward retries. +testChannelExtBodyOverflowSkipsVectorMark :: HasCallStack => TestParams -> IO () +testChannelExtBodyOverflowSkipsVectorMark ps = + withNewTestChat ps "alice" aliceProfile $ \alice -> do + withNewTestChatOpts ps relayTestOpts "bob" bobProfile $ \bob -> do + withNewTestChat ps "cath" cathProfile $ \cath -> do + withNewTestChat ps "dan" danProfile $ \dan -> do + (shortLink, fullLink) <- prepareChannel1Relay "team" alice bob + memberJoinChannel "team" [bob] [alice] shortLink fullLink cath + memberJoinChannel "team" [bob] [alice] shortLink fullLink dan + + -- Inflate cath's profile image directly on the relay's database so the + -- prepended XGrpMemNew element for her overflows maxEncodedMsgLength even + -- with a tiny forwarded body. This bypasses the chat client's profile-size + -- validation; we're exercising the relay's overflow path, not the validator. + let bigImage = T.pack ("data:image/png;base64," <> replicate 16000 'A') + withCCTransaction bob $ \db -> + DB.execute + db + "UPDATE contact_profiles SET image = ? WHERE display_name = ?" + (bigImage, "cath" :: T.Text) + + -- cath sends a small message. The relay loads cath's (now huge) profile from + -- its DB and tries to prepend it; extBody exceeds maxEncodedMsgLength. The + -- worker falls back to delivering the bare body to all recipients in this + -- job AND skips the vector marks. dan learns nothing about cath from this + -- batch — sees the familiar "unknown member" line — but recovery is not + -- locked out for future forwards. + cath #> "#team hi" + bob <# "#team cath> hi" + -- alice already knows cath via the join-time XGrpMemNew; her view is unaffected. + alice <# "#team cath> hi [>>]" + -- dan never received an XGrpMemNew for cath (no prepend in the overflowing batch). + dan <## "#team: bob forwarded a message from an unknown member, creating unknown member record cath" + dan <# "#team cath> hi [>>]" + + -- Crucial regression assertion: cath.sent_profile_vector on the relay (bob) + -- is empty. Without the fix, the worker would have marked dan.idx (and + -- alice.idx) after the agent-rejected delivery, locking dan out of future + -- dissemination — the bug the fix closes. + checkSentProfileVectorEmpty bob "cath" + where + checkSentProfileVectorEmpty :: HasCallStack => TestCC -> T.Text -> IO () + checkSentProfileVectorEmpty cc senderName = do + lens <- withCCTransaction cc $ \db -> + DB.query + db + "SELECT length(sent_profile_vector) FROM group_members WHERE local_display_name = ?" + (Only senderName) :: + IO [Only Int] + map fromOnly lens `shouldBe` [0] + testChannelMultiSendersIndependentDissemination :: HasCallStack => TestParams -> IO () testChannelMultiSendersIndependentDissemination ps = withNewTestChat ps "alice" aliceProfile $ \alice -> do