mirror of
https://github.com/simplex-chat/simplex-chat.git
synced 2026-07-02 04:51:49 +00:00
wip
This commit is contained in:
@@ -1186,9 +1186,11 @@ serveRoster :: User -> GroupInfo -> GroupMember -> CM ()
|
||||
serveRoster user gInfo member =
|
||||
when (member `supportsVersion` groupRosterVersion) $ do
|
||||
cxt <- chatStoreCxt
|
||||
-- read the served version with the blob, so the recorded served version matches what is served
|
||||
withStore' (\db -> (,) <$> getGroupRoster db gInfo <*> getGroupRosterVersion db gInfo) >>= \case
|
||||
(Just (ownerGMId, brokerTs, sm@SignedMsg {signedBody}, blob_), rosterVer_) ->
|
||||
-- read the stored blob's own version, not roster_version: roster_version is the acceptance gate (highest
|
||||
-- version accepted), which a delta can advance past the stored blob when the owner's blob send failed -
|
||||
-- recording it would over-claim what this member was actually served
|
||||
withStore' (\db -> (,) <$> getGroupRoster db gInfo <*> getStoredRosterVersion db gInfo) >>= \case
|
||||
(Just (ownerGMId, brokerTs, sm@SignedMsg {signedBody}, blob_), storedVer_) ->
|
||||
case J.eitherDecodeStrict' signedBody :: Either String (ChatMessage 'Json) of
|
||||
Left e -> logError $ "serveRoster: cannot decode saved roster message: " <> tshow e
|
||||
Right chatMsg@ChatMessage {msgId} ->
|
||||
@@ -1198,8 +1200,8 @@ serveRoster user gInfo member =
|
||||
sendFwdMemberMessage member fwd (VMSigned MSSVerified sm chatMsg)
|
||||
forM_ ((,) <$> msgId <*> blob_) $ \(sid, blob) ->
|
||||
sendInlineBlobChunks user gInfo [member] sid blob
|
||||
-- record the served version so a member can't re-trigger a full serve at a version it already got
|
||||
forM_ rosterVer_ $ \v -> withStore' $ \db -> setMemberRosterServedVersion db member v
|
||||
-- record the served (stored) version so a member can't re-trigger a full serve at a version it already got
|
||||
forM_ storedVer_ $ \v -> withStore' $ \db -> setMemberRosterServedVersion db member v
|
||||
Left e -> logError $ "serveRoster: roster owner not found: " <> tshow e
|
||||
(Nothing, _) -> pure ()
|
||||
|
||||
|
||||
@@ -3261,17 +3261,21 @@ processAgentMessageConn cxt user@User {userId} corrId agentConnId agentMessage =
|
||||
then (requestRosterOnGap gInfo fwdRelay_ cur v `catchAllErrors` eToView) >> action
|
||||
else messageWarning "x.grp.mem: roster version not newer than current, ignoring" $> Nothing
|
||||
|
||||
-- A subscriber that skipped versions (cur known and v > cur+1) asks the relay that forwarded this delta
|
||||
-- to re-serve the full roster, recovering the privileged set and keys carried by the missed versions.
|
||||
-- The request carries the pre-gap cur (the relay serves only what it holds newer). Best-effort: a failed
|
||||
-- request must not block applying the delta. Relays and the direct path don't request (fwdRelay_ is Nothing).
|
||||
-- A subscriber that skipped versions (no roster yet, or v above cur+1) asks the relay that forwarded this
|
||||
-- delta to re-serve the full roster, recovering the privileged set and keys carried by the missed versions.
|
||||
-- The request carries the subscriber's current version (Nothing if none); the relay serves only what it holds
|
||||
-- newer. Best-effort: a failed request must not block applying the delta. Relays and the direct path don't
|
||||
-- request (fwdRelay_ is Nothing), and a relay that predates roster support is skipped.
|
||||
requestRosterOnGap :: GroupInfo -> Maybe GroupMember -> Maybe VersionRoster -> VersionRoster -> CM ()
|
||||
requestRosterOnGap gInfo fwdRelay_ cur_ v
|
||||
| isUserGrpFwdRelay gInfo = pure ()
|
||||
| otherwise = case (fwdRelay_, cur_) of
|
||||
(Just relay, Just cur@(VersionRoster c))
|
||||
| v > VersionRoster (c + 1) -> void $ sendGroupMessage' user gInfo [relay] (XGrpRosterRequest cur)
|
||||
| otherwise = case fwdRelay_ of
|
||||
Just relay
|
||||
| gap, relay `supportsVersion` groupRosterVersion ->
|
||||
void $ sendGroupMessage' user gInfo [relay] (XGrpRosterRequest cur_)
|
||||
_ -> pure ()
|
||||
where
|
||||
gap = maybe True (\(VersionRoster c) -> v > VersionRoster (c + 1)) cur_
|
||||
|
||||
xGrpMemRole :: GroupInfo -> Maybe GroupMember -> GroupMember -> MemberId -> GroupMemberRole -> Maybe MemberKey -> Maybe VersionRoster -> RcvMessage -> UTCTime -> CM (Maybe DeliveryJobScope)
|
||||
xGrpMemRole gInfo@GroupInfo {membership} fwdRelay_ m@GroupMember {memberRole = senderRole} memId memRole memberKey_ rosterVer_ msg@RcvMessage {msgSigned} brokerTs
|
||||
@@ -3496,17 +3500,18 @@ processAgentMessageConn cxt user@User {userId} corrId agentConnId agentMessage =
|
||||
messageError $ "x.grp.roster.ack: relay could not save roster, marked rejected: " <> e
|
||||
_ -> pure ()
|
||||
|
||||
-- A relay re-serves the full roster to a subscriber that detected a version gap, but only when it holds
|
||||
-- something newer than BOTH the requester's claimed version and the version it last served this member -
|
||||
-- the latter bounds reflected amplification (a member can't re-trigger a full serve with reqVer = 0).
|
||||
-- A relay re-serves the full roster to a subscriber that detected a version gap, but only when its STORED
|
||||
-- blob is newer than BOTH the requester's version (Nothing = none) and the version it last served this member
|
||||
-- - the latter bounds reflected amplification (a member can't re-trigger a full serve). Gating on the stored
|
||||
-- blob (not roster_version, the gate) means the relay serves only a blob the requester will accept.
|
||||
-- serveRoster records the served version (on all serve paths) and is a no-op without a roster.
|
||||
xGrpRosterRequest :: GroupInfo -> GroupMember -> VersionRoster -> CM ()
|
||||
xGrpRosterRequest gInfo m reqVer =
|
||||
xGrpRosterRequest :: GroupInfo -> GroupMember -> Maybe VersionRoster -> CM ()
|
||||
xGrpRosterRequest gInfo m reqVer_ =
|
||||
when (isUserGrpFwdRelay gInfo) $ do
|
||||
(cur_, served_) <- withStore' $ \db ->
|
||||
(,) <$> getGroupRosterVersion db gInfo <*> getMemberRosterServedVersion db m
|
||||
forM_ cur_ $ \cur ->
|
||||
when (cur > reqVer && maybe True (cur >) served_) $ serveRoster user gInfo m
|
||||
(stored_, served_) <- withStore' $ \db ->
|
||||
(,) <$> getStoredRosterVersion db gInfo <*> getMemberRosterServedVersion db m
|
||||
forM_ stored_ $ \stored ->
|
||||
when (maybe True (stored >) reqVer_ && maybe True (stored >) served_) $ serveRoster user gInfo m
|
||||
|
||||
checkHostRole :: GroupMember -> GroupMemberRole -> CM ()
|
||||
checkHostRole GroupMember {memberRole, localDisplayName} memRole =
|
||||
|
||||
@@ -525,7 +525,7 @@ data ChatMsgEvent (e :: MsgEncoding) where
|
||||
XGrpDirectInv :: ConnReqInvitation -> Maybe MsgContent -> Maybe MsgScope -> ChatMsgEvent 'Json
|
||||
XGrpRoster :: GroupRoster -> ChatMsgEvent 'Json
|
||||
XGrpRosterAck :: VersionRoster -> Maybe Text -> ChatMsgEvent 'Json
|
||||
XGrpRosterRequest :: VersionRoster -> ChatMsgEvent 'Json
|
||||
XGrpRosterRequest :: Maybe VersionRoster -> ChatMsgEvent 'Json
|
||||
XGrpMsgForward :: GrpMsgForward -> ChatMessage 'Json -> ChatMsgEvent 'Json
|
||||
XInfoProbe :: Probe -> ChatMsgEvent 'Json
|
||||
XInfoProbeCheck :: ProbeHash -> ChatMsgEvent 'Json
|
||||
@@ -1449,7 +1449,7 @@ appJsonToCM AppMessageJson {v, msgId, event, params} = do
|
||||
XGrpDirectInv_ -> XGrpDirectInv <$> p "connReq" <*> opt "content" <*> opt "scope"
|
||||
XGrpRoster_ -> XGrpRoster <$> (GroupRoster <$> p "version" <*> p "fileInv")
|
||||
XGrpRosterAck_ -> XGrpRosterAck <$> p "version" <*> opt "error"
|
||||
XGrpRosterRequest_ -> XGrpRosterRequest <$> p "version"
|
||||
XGrpRosterRequest_ -> XGrpRosterRequest <$> opt "version"
|
||||
XGrpMsgForward_ -> do
|
||||
fwdSender <- opt "memberId" >>= \case
|
||||
Just memberId -> FwdMember memberId . fromMaybe "" <$> opt "memberName"
|
||||
@@ -1524,7 +1524,7 @@ chatToAppMessage chatMsg@ChatMessage {chatVRange, msgId, chatMsgEvent} = case en
|
||||
XGrpDirectInv connReq content scope -> o $ ("content" .=? content) $ ("scope" .=? scope) ["connReq" .= connReq]
|
||||
XGrpRoster GroupRoster {version, fileInv} -> o ["version" .= version, "fileInv" .= fileInv]
|
||||
XGrpRosterAck version err -> o $ ("error" .=? err) ["version" .= version]
|
||||
XGrpRosterRequest version -> o ["version" .= version]
|
||||
XGrpRosterRequest version -> o $ ("version" .=? version) []
|
||||
XGrpMsgForward GrpMsgForward {fwdSender, fwdBrokerTs} msg -> o $ encodeFwdSender fwdSender ["msg" .= msg, "msgTs" .= fwdBrokerTs]
|
||||
where
|
||||
encodeFwdSender = \case
|
||||
|
||||
@@ -90,6 +90,7 @@ module Simplex.Chat.Store.Groups
|
||||
getPublishableGroupRelays,
|
||||
setGroupRosterVersion,
|
||||
getGroupRosterVersion,
|
||||
getStoredRosterVersion,
|
||||
setMemberRosterServedVersion,
|
||||
getMemberRosterServedVersion,
|
||||
getGroupRoster,
|
||||
@@ -1480,6 +1481,14 @@ getGroupRosterVersion db GroupInfo {groupId} =
|
||||
fmap join . maybeFirstRow fromOnly $
|
||||
DB.query db "SELECT roster_version FROM groups WHERE group_id = ?" (Only groupId)
|
||||
|
||||
-- The version of the roster blob actually stored (written with the blob in setGroupLiveRoster), as opposed to
|
||||
-- roster_version (the acceptance gate). The owner sends the blob before the delta, so these normally match; a
|
||||
-- failed blob send leaves the gate (advanced by the delta) ahead of the stored blob until a later roster completes.
|
||||
getStoredRosterVersion :: DB.Connection -> GroupInfo -> IO (Maybe VersionRoster)
|
||||
getStoredRosterVersion db GroupInfo {groupId} =
|
||||
fmap join . maybeFirstRow fromOnly $
|
||||
DB.query db "SELECT stored_roster_version FROM groups WHERE group_id = ?" (Only groupId)
|
||||
|
||||
-- The newest roster version a relay re-served to this member on its catch-up request: bounds reflected
|
||||
-- amplification, so a member can't re-trigger a full serve at a version it was already served.
|
||||
setMemberRosterServedVersion :: DB.Connection -> GroupMember -> VersionRoster -> IO ()
|
||||
@@ -1591,6 +1600,10 @@ getRosterTransfer db fileId =
|
||||
|
||||
-- Write the single live roster on groups from a completed transfer's values (header NULL on a member,
|
||||
-- so its live roster_msg_* stay NULL and it never re-serves; only relays re-serve).
|
||||
-- Sets BOTH versions: a completed blob advances the gate (roster_version - refuse anything older, the downgrade
|
||||
-- protection for the no-delta join/new-relay/re-serve paths, where the blob is the only thing that sets the gate)
|
||||
-- and records the stored version (stored_roster_version - what is actually held and re-served). Deltas advance
|
||||
-- only the gate, so stored_roster_version <= roster_version always.
|
||||
setGroupLiveRoster :: DB.Connection -> GroupInfo -> VersionRoster -> GroupMemberId -> UTCTime -> Maybe SignedMsg -> ByteString -> IO ()
|
||||
setGroupLiveRoster db GroupInfo {groupId} v ownerGMId brokerTs sm_ blob = do
|
||||
currentTs <- getCurrentTime
|
||||
@@ -1598,13 +1611,13 @@ setGroupLiveRoster db GroupInfo {groupId} v ownerGMId brokerTs sm_ blob = do
|
||||
db
|
||||
[sql|
|
||||
UPDATE groups SET
|
||||
roster_version = ?, roster_blob = ?,
|
||||
roster_version = ?, stored_roster_version = ?, roster_blob = ?,
|
||||
roster_sending_owner_gm_id = ?, roster_broker_ts = ?,
|
||||
roster_msg_chat_binding = ?, roster_msg_signatures = ?, roster_msg_body = ?,
|
||||
updated_at = ?
|
||||
WHERE group_id = ?
|
||||
|]
|
||||
( (v, Binary blob, ownerGMId, brokerTs)
|
||||
( (v, v, Binary blob, ownerGMId, brokerTs)
|
||||
:. ((\SignedMsg {chatBinding} -> chatBinding) <$> sm_, (\SignedMsg {signatures} -> Binary (smpEncode signatures)) <$> sm_, (\SignedMsg {signedBody} -> Binary signedBody) <$> sm_, currentTs, groupId)
|
||||
)
|
||||
|
||||
|
||||
@@ -10,10 +10,12 @@ m20260629_member_roster_served :: Text
|
||||
m20260629_member_roster_served =
|
||||
[r|
|
||||
ALTER TABLE group_members ADD COLUMN roster_served_version BIGINT;
|
||||
ALTER TABLE groups ADD COLUMN stored_roster_version BIGINT;
|
||||
|]
|
||||
|
||||
down_m20260629_member_roster_served :: Text
|
||||
down_m20260629_member_roster_served =
|
||||
[r|
|
||||
ALTER TABLE group_members DROP COLUMN roster_served_version;
|
||||
ALTER TABLE groups DROP COLUMN stored_roster_version;
|
||||
|]
|
||||
|
||||
@@ -9,10 +9,12 @@ m20260629_member_roster_served :: Query
|
||||
m20260629_member_roster_served =
|
||||
[sql|
|
||||
ALTER TABLE group_members ADD COLUMN roster_served_version INTEGER;
|
||||
ALTER TABLE groups ADD COLUMN stored_roster_version INTEGER;
|
||||
|]
|
||||
|
||||
down_m20260629_member_roster_served :: Query
|
||||
down_m20260629_member_roster_served =
|
||||
[sql|
|
||||
ALTER TABLE group_members DROP COLUMN roster_served_version;
|
||||
ALTER TABLE groups DROP COLUMN stored_roster_version;
|
||||
|]
|
||||
|
||||
Reference in New Issue
Block a user