core: forward based on relations vector (#6464)

* core: forward based on relations vector wip

* fix introductions

* fix forwarding tests

* fix forwarding inside support scope

* fix deduplication test

* fix more tests

* plans, api

* live migration wip

* enable tests

* member locks

* api

* plans

* fix for postgres

* fix for postgres

* rename predicate

* rename predicate

* optimize

* refactor

* fix

* check

* move part of migration to sql

* plans

* core: preserve detailed information in relation vectors (#6484)

* core: relations vector live migrations; stage 2 migration sql (#6472)

* rework forwarding in support scope

* move operations inside transactions

* set_member_vector_new_relation function

* read vector ad-hoc

* partition in transaction

* fix postgres

* postgres schema

* api

* plans

* remove comment

* lock before migration computation

* refactor

* simplify set relations

* retreive only support scope members

* fix

* refactor

* fix comment

* enable tests

* 1 second

* for update

* locks

* fix mask

* plans

* fix

* postgres

---------

Co-authored-by: Evgeny Poberezkin <evgeny@poberezkin.com>
This commit is contained in:
spaced4ndy
2025-12-09 14:48:32 +00:00
committed by GitHub
parent 1a22a34c41
commit f76e994034
30 changed files with 1260 additions and 497 deletions
@@ -22,6 +22,7 @@ import Simplex.Chat.Store.Postgres.Migrations.M20250922_remove_unused_connection
import Simplex.Chat.Store.Postgres.Migrations.M20251007_connections_sync
import Simplex.Chat.Store.Postgres.Migrations.M20251017_chat_tags_cascade
import Simplex.Chat.Store.Postgres.Migrations.M20251117_member_relations_vector
-- import Simplex.Chat.Store.Postgres.Migrations.M20251128_member_relations_vector_stage_2
import Simplex.Messaging.Agent.Store.Shared (Migration (..))
schemaMigrations :: [(String, Text, Maybe Text)]
@@ -44,6 +45,7 @@ schemaMigrations =
("20251007_connections_sync", m20251007_connections_sync, Just down_m20251007_connections_sync),
("20251017_chat_tags_cascade", m20251017_chat_tags_cascade, Just down_m20251017_chat_tags_cascade),
("20251117_member_relations_vector", m20251117_member_relations_vector, Just down_m20251117_member_relations_vector)
-- ("20251128_member_relations_vector_stage_2", m20251128_member_relations_vector_stage_2, Just down_m20251128_member_relations_vector_stage_2)
]
-- | The list of migrations in ascending order by date
@@ -6,10 +6,91 @@ import Data.Text (Text)
import qualified Data.Text as T
import Text.RawString.QQ (r)
-- This migration creates custom aggregate function migrate_relations_vector(idx, direction, intro_status).
-- Used in live migration and stage 2 migration (M20251128_member_relations_vector_stage_2).
--
-- Vector byte encoding: 4 reserved | 1 direction | 3 status
-- Direction: 0 = IDSubjectIntroduced, 1 = IDReferencedIntroduced
-- Status values: 0 = MRNew, 1 = MRIntroduced, 2 = MRSubjectConnected, 3 = MRReferencedConnected, 4 = MRConnected
--
-- The aggregate transforms intro_status into relation status:
-- - intro_status 'new'/'sent'/'rcv'/'fwd': MRIntroduced (1)
-- - intro_status 're-con': if direction=0 then MRSubjectConnected (2), else MRReferencedConnected (3)
-- - intro_status 'to-con': if direction=0 then MRReferencedConnected (3), else MRSubjectConnected (2)
-- - intro_status 'con': MRConnected (4)
--
-- Final byte combines direction and status: byte = (direction << 3) | status
m20251117_member_relations_vector :: Text
m20251117_member_relations_vector =
T.pack
[r|
CREATE FUNCTION set_member_vector_new_relation(v BYTEA, idx BIGINT, direction INT, status INT)
RETURNS BYTEA AS $$
DECLARE
new_len INT;
result BYTEA;
byte_val INT;
old_byte INT;
BEGIN
IF idx < 0 THEN
RETURN v;
END IF;
IF idx < length(v) THEN
old_byte := get_byte(v, idx::INT);
ELSE
old_byte := 0;
END IF;
byte_val := (old_byte & x'F0'::INT) | (direction * 8) | status;
new_len := GREATEST(length(v), idx + 1);
IF new_len > length(v) THEN
result := v || (SELECT string_agg('\x00'::BYTEA, ''::BYTEA) FROM generate_series(1, new_len - length(v)));
ELSE
result := v;
END IF;
result := set_byte(result, idx::INT, byte_val);
RETURN result;
END;
$$ LANGUAGE plpgsql IMMUTABLE;
CREATE FUNCTION migrate_relations_vector_step(state BYTEA, idx BIGINT, direction INT, intro_status TEXT)
RETURNS BYTEA AS $$
DECLARE
new_len INT;
result BYTEA;
status INT;
byte_val INT;
BEGIN
IF idx < 0 THEN
RETURN state;
END IF;
IF intro_status = 're-con' THEN
IF direction = 0 THEN status := 2; ELSE status := 3; END IF;
ELSIF intro_status = 'to-con' THEN
IF direction = 0 THEN status := 3; ELSE status := 2; END IF;
ELSIF intro_status = 'con' THEN
status := 4;
ELSE
status := 1;
END IF;
byte_val := (direction * 8) + status;
new_len := GREATEST(length(state), idx + 1);
IF new_len > length(state) THEN
result := state || (SELECT string_agg('\x00'::BYTEA, ''::BYTEA) FROM generate_series(1, new_len - length(state)));
ELSE
result := state;
END IF;
result := set_byte(result, idx::INT, byte_val);
RETURN result;
END;
$$ LANGUAGE plpgsql IMMUTABLE;
CREATE AGGREGATE migrate_relations_vector(BIGINT, INT, TEXT) (
SFUNC = migrate_relations_vector_step,
STYPE = BYTEA,
INITCOND = ''
);
ALTER TABLE group_members ADD COLUMN index_in_group BIGINT NOT NULL DEFAULT 0;
ALTER TABLE groups ADD COLUMN member_index BIGINT NOT NULL DEFAULT 0;
@@ -46,12 +127,28 @@ SET member_index = COALESCE((
FROM group_members
WHERE group_members.group_id = g.group_id
), 0);
UPDATE group_members
SET member_relations_vector = ''::BYTEA
WHERE group_id IN (
SELECT mu.group_id
FROM group_members mu
WHERE mu.member_category = 'user'
AND (
mu.member_role NOT IN ('admin', 'owner')
OR mu.member_status IN ('removed', 'left', 'deleted')
)
);
|]
down_m20251117_member_relations_vector :: Text
down_m20251117_member_relations_vector =
T.pack
[r|
DROP AGGREGATE migrate_relations_vector(BIGINT, INT, TEXT);
DROP FUNCTION migrate_relations_vector_step(BYTEA, BIGINT, INT, TEXT);
DROP FUNCTION set_member_vector_new_relation(BYTEA, BIGINT, INT, INT);
DROP INDEX idx_group_members_group_id_index_in_group;
ALTER TABLE group_members DROP COLUMN index_in_group;
@@ -0,0 +1,45 @@
{-# LANGUAGE QuasiQuotes #-}
module Simplex.Chat.Store.Postgres.Migrations.M20251128_member_relations_vector_stage_2 where
import Data.Text (Text)
import qualified Data.Text as T
import Text.RawString.QQ (r)
-- Build member_relations_vector for all members that don't have it yet.
-- Uses custom aggregate function migrate_relations_vector defined in M20251117_member_relations_vector.
--
-- Query returns (idx, direction, intro_status) for each introduction:
-- - direction 0 (IDSubjectIntroduced): current member (subject) is re_group_member_id, was introduced to referenced member
-- - direction 1 (IDReferencedIntroduced): current member (subject) is to_group_member_id, referenced member was introduced to it
-- TODO [relations vector] drop group_member_intros in the end of migration
m20251128_member_relations_vector_stage_2 :: Text
m20251128_member_relations_vector_stage_2 =
T.pack
[r|
UPDATE group_members
SET member_relations_vector = (
SELECT migrate_relations_vector(idx, direction, intro_status)
FROM (
SELECT m.index_in_group AS idx, 0 AS direction, i.intro_status
FROM group_member_intros i
JOIN group_members m ON m.group_member_id = i.to_group_member_id
WHERE i.re_group_member_id = group_members.group_member_id
UNION ALL
SELECT m.index_in_group AS idx, 1 AS direction, i.intro_status
FROM group_member_intros i
JOIN group_members m ON m.group_member_id = i.re_group_member_id
WHERE i.to_group_member_id = group_members.group_member_id
) AS relations
)
WHERE member_relations_vector IS NULL;
|]
-- TODO [relations vector] re-create group_member_intros
down_m20251128_member_relations_vector_stage_2 :: Text
down_m20251128_member_relations_vector_stage_2 =
T.pack
[r|
|]
@@ -34,6 +34,41 @@ $$;
CREATE FUNCTION test_chat_schema.migrate_relations_vector_step(state bytea, idx bigint, direction integer, intro_status text) RETURNS bytea
LANGUAGE plpgsql IMMUTABLE
AS $$
DECLARE
new_len INT;
result BYTEA;
status INT;
byte_val INT;
BEGIN
IF idx < 0 THEN
RETURN state;
END IF;
IF intro_status = 're-con' THEN
IF direction = 0 THEN status := 2; ELSE status := 3; END IF;
ELSIF intro_status = 'to-con' THEN
IF direction = 0 THEN status := 3; ELSE status := 2; END IF;
ELSIF intro_status = 'con' THEN
status := 4;
ELSE
status := 1;
END IF;
byte_val := (direction * 8) + status;
new_len := GREATEST(length(state), idx + 1);
IF new_len > length(state) THEN
result := state || (SELECT string_agg('\x00'::BYTEA, ''::BYTEA) FROM generate_series(1, new_len - length(state)));
ELSE
result := state;
END IF;
result := set_byte(result, idx::INT, byte_val);
RETURN result;
END;
$$;
CREATE FUNCTION test_chat_schema.on_group_members_delete_update_summary() RETURNS trigger
LANGUAGE plpgsql
AS $$
@@ -85,6 +120,45 @@ END;
$$;
CREATE FUNCTION test_chat_schema.set_member_vector_new_relation(v bytea, idx bigint, direction integer, status integer) RETURNS bytea
LANGUAGE plpgsql IMMUTABLE
AS $$
DECLARE
new_len INT;
result BYTEA;
byte_val INT;
old_byte INT;
BEGIN
IF idx < 0 THEN
RETURN v;
END IF;
IF idx < length(v) THEN
old_byte := get_byte(v, idx::INT);
ELSE
old_byte := 0;
END IF;
byte_val := (old_byte & x'F0'::INT) | (direction * 8) | status;
new_len := GREATEST(length(v), idx + 1);
IF new_len > length(v) THEN
result := v || (SELECT string_agg('\x00'::BYTEA, ''::BYTEA) FROM generate_series(1, new_len - length(v)));
ELSE
result := v;
END IF;
result := set_byte(result, idx::INT, byte_val);
RETURN result;
END;
$$;
CREATE AGGREGATE test_chat_schema.migrate_relations_vector(bigint, integer, text) (
SFUNC = test_chat_schema.migrate_relations_vector_step,
STYPE = bytea,
INITCOND = ''
);
SET default_table_access_method = heap;