From bed00bb7669d2768a7e13715bc89b5c34670225f Mon Sep 17 00:00:00 2001 From: Quentin Gliech Date: Tue, 14 Apr 2026 18:44:58 +0200 Subject: [PATCH] Allow resigning of events with a new signing key (#19668) This adds a way to re-sign all locally-created events with a new signing key, which is useful when rotating server signing keys. This doesn't trigger automatically, instead needs to be triggered when needed via the admin API. c.f. https://github.com/matrix-org/internal-config/issues/1670#issuecomment-4206020126 for internal discussion. --------- Co-authored-by: Kegan Dougall Co-authored-by: Erik Johnston --- changelog.d/19668.feature | 1 + .../admin_api/background_updates.md | 3 + poetry.lock | 46 ++--- pyproject.toml | 5 +- synapse/crypto/event_signing.py | 55 ++++- synapse/rest/admin/background_updates.py | 19 ++ .../databases/main/events_bg_updates.py | 193 +++++++++++++++++- synapse/types/storage/__init__.py | 2 + tests/crypto/test_event_signing.py | 135 +++++++++++- tests/storage/test_events_bg_updates.py | 168 ++++++++++++++- 10 files changed, 594 insertions(+), 33 deletions(-) create mode 100644 changelog.d/19668.feature diff --git a/changelog.d/19668.feature b/changelog.d/19668.feature new file mode 100644 index 0000000000..ab3585c172 --- /dev/null +++ b/changelog.d/19668.feature @@ -0,0 +1 @@ +Add a way to re-sign local events with a new signing key. diff --git a/docs/usage/administration/admin_api/background_updates.md b/docs/usage/administration/admin_api/background_updates.md index 7b75ee5587..7a78b8964b 100644 --- a/docs/usage/administration/admin_api/background_updates.md +++ b/docs/usage/administration/admin_api/background_updates.md @@ -107,3 +107,6 @@ The following JSON body parameters are available: - `job_name` - A string which job to run. Valid values are: - `populate_stats_process_rooms` - Recalculate the stats for all rooms. - `regenerate_directory` - Recalculate the [user directory](../../../user_directory.md) if it is stale or out of sync. + - `event_resign` - Re-sign all locally-sent events with the current signing key. This is useful after rotating the server's signing key to ensure all historical events are signed with the new key. Optional additional parameters: + - `old_key` - Only re-sign events whose signature verifies against this key. Format: `"ed25519:key_id base64_public_key"` (e.g. `"ed25519:my_old_key XGX0JRS2Af3be3knz2fBiRbApjm2Dh61gXDJA8kcJNI"`). + - `before_ts` - Only re-sign events with a `received_ts` less than this value (milliseconds since the epoch). diff --git a/poetry.lock b/poetry.lock index 9df82f50a6..ef5c13684d 100644 --- a/poetry.lock +++ b/poetry.lock @@ -31,7 +31,7 @@ description = "The ultimate Python library in building OAuth and OpenID Connect optional = true python-versions = ">=3.9" groups = ["main"] -markers = "extra == \"all\" or extra == \"jwt\" or extra == \"oidc\"" +markers = "extra == \"oidc\" or extra == \"jwt\" or extra == \"all\"" files = [ {file = "authlib-1.6.9-py2.py3-none-any.whl", hash = "sha256:f08b4c14e08f0861dc18a32357b33fbcfd2ea86cfe3fe149484b4d764c4a0ac3"}, {file = "authlib-1.6.9.tar.gz", hash = "sha256:d8f2421e7e5980cc1ddb4e32d3f5fa659cfaf60d8eaf3281ebed192e4ab74f04"}, @@ -531,7 +531,7 @@ description = "XML bomb protection for Python stdlib modules" optional = true python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*" groups = ["main"] -markers = "extra == \"all\" or extra == \"saml2\"" +markers = "extra == \"saml2\" or extra == \"all\"" files = [ {file = "defusedxml-0.7.1-py2.py3-none-any.whl", hash = "sha256:a352e7e428770286cc899e2542b6cdaedb2b4953ff269a210103ec58f6198a61"}, {file = "defusedxml-0.7.1.tar.gz", hash = "sha256:1bb3032db185915b62d7c6209c5a8792be6a32ab2fedacc84e01b52c51aa3e69"}, @@ -556,7 +556,7 @@ description = "XPath 1.0/2.0/3.0/3.1 parsers and selectors for ElementTree and l optional = true python-versions = ">=3.8" groups = ["main"] -markers = "extra == \"all\" or extra == \"saml2\"" +markers = "extra == \"saml2\" or extra == \"all\"" files = [ {file = "elementpath-4.8.0-py3-none-any.whl", hash = "sha256:5393191f84969bcf8033b05ec4593ef940e58622ea13cefe60ecefbbf09d58d9"}, {file = "elementpath-4.8.0.tar.gz", hash = "sha256:5822a2560d99e2633d95f78694c7ff9646adaa187db520da200a8e9479dc46ae"}, @@ -606,7 +606,7 @@ description = "Python wrapper for hiredis" optional = true python-versions = ">=3.8" groups = ["main"] -markers = "extra == \"all\" or extra == \"redis\"" +markers = "extra == \"redis\" or extra == \"all\"" files = [ {file = "hiredis-3.3.1-cp310-cp310-macosx_10_15_universal2.whl", hash = "sha256:f525734382a47f9828c9d6a1501522c78d5935466d8e2be1a41ba40ca5bb922b"}, {file = "hiredis-3.3.1-cp310-cp310-macosx_10_15_x86_64.whl", hash = "sha256:6e2e1024f0a021777740cb7c633a0efb2c4a4bc570f508223a8dcbcf79f99ef9"}, @@ -930,7 +930,7 @@ description = "Jaeger Python OpenTracing Tracer implementation" optional = true python-versions = ">=3.7" groups = ["main"] -markers = "extra == \"all\" or extra == \"opentracing\"" +markers = "extra == \"opentracing\" or extra == \"all\"" files = [ {file = "jaeger-client-4.8.0.tar.gz", hash = "sha256:3157836edab8e2c209bd2d6ae61113db36f7ee399e66b1dcbb715d87ab49bfe0"}, ] @@ -1122,7 +1122,7 @@ description = "A strictly RFC 4510 conforming LDAP V3 pure Python client library optional = true python-versions = "*" groups = ["main"] -markers = "extra == \"all\" or extra == \"matrix-synapse-ldap3\"" +markers = "extra == \"matrix-synapse-ldap3\" or extra == \"all\"" files = [ {file = "ldap3-2.9.1-py2.py3-none-any.whl", hash = "sha256:5869596fc4948797020d3f03b7939da938778a0f9e2009f7a072ccf92b8e8d70"}, {file = "ldap3-2.9.1.tar.gz", hash = "sha256:f3e7fc4718e3f09dda568b57100095e0ce58633bcabbed8667ce3f8fbaa4229f"}, @@ -1239,7 +1239,7 @@ description = "Powerful and Pythonic XML processing library combining libxml2/li optional = true python-versions = ">=3.8" groups = ["main"] -markers = "extra == \"all\" or extra == \"url-preview\"" +markers = "extra == \"url-preview\" or extra == \"all\"" files = [ {file = "lxml-6.0.2-cp310-cp310-macosx_10_9_universal2.whl", hash = "sha256:e77dd455b9a16bbd2a5036a63ddbd479c19572af81b624e79ef422f929eef388"}, {file = "lxml-6.0.2-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:5d444858b9f07cefff6455b983aea9a67f7462ba1f6cbe4a21e8bf6791bf2153"}, @@ -1553,7 +1553,7 @@ description = "An LDAP3 auth provider for Synapse" optional = true python-versions = ">=3.10" groups = ["main"] -markers = "extra == \"all\" or extra == \"matrix-synapse-ldap3\"" +markers = "extra == \"matrix-synapse-ldap3\" or extra == \"all\"" files = [ {file = "matrix_synapse_ldap3-0.4.0-py3-none-any.whl", hash = "sha256:bf080037230d2af5fd3639cb87266de65c1cad7a68ea206278c5b4bf9c1a17f3"}, {file = "matrix_synapse_ldap3-0.4.0.tar.gz", hash = "sha256:cff52ba780170de5e6e8af42863d2648ee23f3bf0a9fea6db52372f9fc00be2b"}, @@ -1834,7 +1834,7 @@ description = "OpenTracing API for Python. See documentation at http://opentraci optional = true python-versions = "*" groups = ["main"] -markers = "extra == \"all\" or extra == \"opentracing\"" +markers = "extra == \"opentracing\" or extra == \"all\"" files = [ {file = "opentracing-2.4.0.tar.gz", hash = "sha256:a173117e6ef580d55874734d1fa7ecb6f3655160b8b8974a2a1e98e5ec9c840d"}, ] @@ -2032,7 +2032,7 @@ description = "psycopg2 - Python-PostgreSQL Database Adapter" optional = true python-versions = ">=3.9" groups = ["main"] -markers = "extra == \"all\" or extra == \"postgres\"" +markers = "extra == \"postgres\" or extra == \"all\"" files = [ {file = "psycopg2-2.9.11-cp310-cp310-win_amd64.whl", hash = "sha256:103e857f46bb76908768ead4e2d0ba1d1a130e7b8ed77d3ae91e8b33481813e8"}, {file = "psycopg2-2.9.11-cp311-cp311-win_amd64.whl", hash = "sha256:210daed32e18f35e3140a1ebe059ac29209dd96468f2f7559aa59f75ee82a5cb"}, @@ -2050,7 +2050,7 @@ description = ".. image:: https://travis-ci.org/chtd/psycopg2cffi.svg?branch=mas optional = true python-versions = "*" groups = ["main"] -markers = "platform_python_implementation == \"PyPy\" and (extra == \"all\" or extra == \"postgres\")" +markers = "platform_python_implementation == \"PyPy\" and (extra == \"postgres\" or extra == \"all\")" files = [ {file = "psycopg2cffi-2.9.0.tar.gz", hash = "sha256:7e272edcd837de3a1d12b62185eb85c45a19feda9e62fa1b120c54f9e8d35c52"}, ] @@ -2066,7 +2066,7 @@ description = "A Simple library to enable psycopg2 compatability" optional = true python-versions = "*" groups = ["main"] -markers = "platform_python_implementation == \"PyPy\" and (extra == \"all\" or extra == \"postgres\")" +markers = "platform_python_implementation == \"PyPy\" and (extra == \"postgres\" or extra == \"all\")" files = [ {file = "psycopg2cffi-compat-1.1.tar.gz", hash = "sha256:d25e921748475522b33d13420aad5c2831c743227dc1f1f2585e0fdb5c914e05"}, ] @@ -2348,7 +2348,7 @@ description = "A development tool to measure, monitor and analyze the memory beh optional = true python-versions = ">=3.6" groups = ["main"] -markers = "extra == \"all\" or extra == \"cache-memory\"" +markers = "extra == \"cache-memory\" or extra == \"all\"" files = [ {file = "Pympler-1.0.1-py3-none-any.whl", hash = "sha256:d260dda9ae781e1eab6ea15bacb84015849833ba5555f141d2d9b7b7473b307d"}, {file = "Pympler-1.0.1.tar.gz", hash = "sha256:993f1a3599ca3f4fcd7160c7545ad06310c9e12f70174ae7ae8d4e25f6c5d3fa"}, @@ -2480,7 +2480,7 @@ description = "Python implementation of SAML Version 2 Standard" optional = true python-versions = ">=3.9,<4.0" groups = ["main"] -markers = "extra == \"all\" or extra == \"saml2\"" +markers = "extra == \"saml2\" or extra == \"all\"" files = [ {file = "pysaml2-7.5.0-py3-none-any.whl", hash = "sha256:bc6627cc344476a83c757f440a73fda1369f13b6fda1b4e16bca63ffbabb5318"}, {file = "pysaml2-7.5.0.tar.gz", hash = "sha256:f36871d4e5ee857c6b85532e942550d2cf90ea4ee943d75eb681044bbc4f54f7"}, @@ -2505,7 +2505,7 @@ description = "Extensions to the standard Python datetime module" optional = true python-versions = "!=3.0.*,!=3.1.*,!=3.2.*,>=2.7" groups = ["main"] -markers = "extra == \"all\" or extra == \"saml2\"" +markers = "extra == \"saml2\" or extra == \"all\"" files = [ {file = "python-dateutil-2.9.0.post0.tar.gz", hash = "sha256:37dd54208da7e1cd875388217d5e00ebd4179249f90fb72437e91a35459a0ad3"}, {file = "python_dateutil-2.9.0.post0-py2.py3-none-any.whl", hash = "sha256:a8b2bc7bffae282281c8140a97d3aa9c14da0b136dfe83f850eea9a5f7470427"}, @@ -2533,7 +2533,7 @@ description = "World timezone definitions, modern and historical" optional = true python-versions = "*" groups = ["main"] -markers = "extra == \"all\" or extra == \"saml2\"" +markers = "extra == \"saml2\" or extra == \"all\"" files = [ {file = "pytz-2026.1.post1-py2.py3-none-any.whl", hash = "sha256:f2fd16142fda348286a75e1a524be810bb05d444e5a081f37f7affc635035f7a"}, {file = "pytz-2026.1.post1.tar.gz", hash = "sha256:3378dde6a0c3d26719182142c56e60c7f9af7e968076f31aae569d72a0358ee1"}, @@ -2938,7 +2938,7 @@ description = "Python client for Sentry (https://sentry.io)" optional = true python-versions = ">=3.6" groups = ["main"] -markers = "extra == \"all\" or extra == \"sentry\"" +markers = "extra == \"sentry\" or extra == \"all\"" files = [ {file = "sentry_sdk-2.57.0-py2.py3-none-any.whl", hash = "sha256:812c8bf5ff3d2f0e89c82f5ce80ab3a6423e102729c4706af7413fd1eb480585"}, {file = "sentry_sdk-2.57.0.tar.gz", hash = "sha256:4be8d1e71c32fb27f79c577a337ac8912137bba4bcbc64a4ec1da4d6d8dc5199"}, @@ -3138,7 +3138,7 @@ description = "Tornado IOLoop Backed Concurrent Futures" optional = true python-versions = "*" groups = ["main"] -markers = "extra == \"all\" or extra == \"opentracing\"" +markers = "extra == \"opentracing\" or extra == \"all\"" files = [ {file = "threadloop-1.0.2-py2-none-any.whl", hash = "sha256:5c90dbefab6ffbdba26afb4829d2a9df8275d13ac7dc58dccb0e279992679599"}, {file = "threadloop-1.0.2.tar.gz", hash = "sha256:8b180aac31013de13c2ad5c834819771992d350267bddb854613ae77ef571944"}, @@ -3154,7 +3154,7 @@ description = "Python bindings for the Apache Thrift RPC system" optional = true python-versions = "*" groups = ["main"] -markers = "extra == \"all\" or extra == \"opentracing\"" +markers = "extra == \"opentracing\" or extra == \"all\"" files = [ {file = "thrift-0.22.0.tar.gz", hash = "sha256:42e8276afbd5f54fe1d364858b6877bc5e5a4a5ed69f6a005b94ca4918fe1466"}, ] @@ -3229,7 +3229,7 @@ description = "Tornado is a Python web framework and asynchronous networking lib optional = true python-versions = ">=3.9" groups = ["main"] -markers = "extra == \"all\" or extra == \"opentracing\"" +markers = "extra == \"opentracing\" or extra == \"all\"" files = [ {file = "tornado-6.5.5-cp39-abi3-macosx_10_9_universal2.whl", hash = "sha256:487dc9cc380e29f58c7ab88f9e27cdeef04b2140862e5076a66fb6bb68bb1bfa"}, {file = "tornado-6.5.5-cp39-abi3-macosx_10_9_x86_64.whl", hash = "sha256:65a7f1d46d4bb41df1ac99f5fcb685fb25c7e61613742d5108b010975a9a6521"}, @@ -3361,7 +3361,7 @@ description = "non-blocking redis client for python" optional = true python-versions = "*" groups = ["main"] -markers = "extra == \"all\" or extra == \"redis\"" +markers = "extra == \"redis\" or extra == \"all\"" files = [ {file = "txredisapi-1.4.11-py3-none-any.whl", hash = "sha256:ac64d7a9342b58edca13ef267d4fa7637c1aa63f8595e066801c1e8b56b22d0b"}, {file = "txredisapi-1.4.11.tar.gz", hash = "sha256:3eb1af99aefdefb59eb877b1dd08861efad60915e30ad5bf3d5bf6c5cedcdbc6"}, @@ -3622,7 +3622,7 @@ description = "An XML Schema validator and decoder" optional = true python-versions = ">=3.7" groups = ["main"] -markers = "extra == \"all\" or extra == \"saml2\"" +markers = "extra == \"saml2\" or extra == \"all\"" files = [ {file = "xmlschema-2.5.1-py3-none-any.whl", hash = "sha256:ec2b2a15c8896c1fcd14dcee34ca30032b99456c3c43ce793fdb9dca2fb4b869"}, {file = "xmlschema-2.5.1.tar.gz", hash = "sha256:4f7497de6c8b6dc2c28ad7b9ed6e21d186f4afe248a5bea4f54eedab4da44083"}, @@ -3756,4 +3756,4 @@ url-preview = ["lxml"] [metadata] lock-version = "2.1" python-versions = ">=3.10.0,<4.0.0" -content-hash = "ce9ac9da9e7ffaf24b3e1e7892342ba486e7af4ea25385f875d0f3a2d5c5d133" +content-hash = "ef0540b89c417a69668f551688bd0974256ea7a580044f3954a76bdf0d8fe7c9" diff --git a/pyproject.toml b/pyproject.toml index 5fa7dade52..c156d4f899 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -29,7 +29,10 @@ dependencies = [ # We require 2.0.0 for immutabledict support. "canonicaljson>=2.0.0,<3.0.0", # we use the type definitions added in signedjson 1.1. - "signedjson>=1.1.0,<2.0.0", + # 1.1.0 erroneously removed decode_verify_key_base64 (reintroduced in 1.1.1). + # 1.1.1 is mispackaged (importlib-metadata dependency without minimum version bound) + # 1.1.2, 1.1.3 and 1.1.4 were all released on the same day, so no good reason to use the older version. + "signedjson>=1.1.4,<2.0.0", # validating SSL certs for IP addresses requires service_identity 18.1. "service-identity>=18.1.0", # Twisted 18.9 introduces some logger improvements that the structured diff --git a/synapse/crypto/event_signing.py b/synapse/crypto/event_signing.py index d13d5d04c3..d789c06a9c 100644 --- a/synapse/crypto/event_signing.py +++ b/synapse/crypto/event_signing.py @@ -27,7 +27,7 @@ from typing import Any, Callable from canonicaljson import encode_canonical_json from signedjson.sign import sign_json -from signedjson.types import SigningKey +from signedjson.types import SigningKey, VerifyKey from unpaddedbase64 import decode_base64, encode_base64 from synapse.api.errors import Codes, SynapseError @@ -35,7 +35,7 @@ from synapse.api.room_versions import RoomVersion from synapse.events import EventBase from synapse.events.utils import prune_event, prune_event_dict from synapse.logging.opentracing import trace -from synapse.types import JsonDict +from synapse.types import JsonDict, UserID logger = logging.getLogger(__name__) @@ -192,3 +192,54 @@ def add_hashes_and_signatures( event_dict["signatures"] = compute_event_signature( room_version, event_dict, signature_name=signature_name, signing_key=signing_key ) + + +def resign_event( + ev: EventBase, + server_name: str, + signing_key: SigningKey, + time_now: int | None = None, +) -> JsonDict: + """Re-sign the provided event with the given signing key. Any existing signatures on the event + for this server_name are removed. + + If there has been no signature for this event by this server_name, the event is still re-signed. + If there have been signatures on this event by this server_name, the event is not re-checked for + validity. As such, only events that have valid signatures should be passed into this function + e.g. from the event_json table in the database. + """ + event_dict = ev.get_pdu_json(time_now=time_now) + event_dict["signatures"].pop( + server_name, None + ) # remove existing signatures for this server_name + event_dict["signatures"].update( + compute_event_signature( + ev.room_version, + event_dict, + server_name, + signing_key, + ) + ) + return event_dict + + +def event_needs_resigning( + ev: EventBase, server_name: str, verify_key: VerifyKey +) -> bool: + """Check if this event needs re-signing. + + This returns True if all of the following are True: + - the event `sender` domain matches the `server_name` provided. + - the event has not been already signed with this `verify_key`. + """ + sender = UserID.from_string(ev.sender) + if sender.domain != server_name: + return False + want_key_id = verify_key.alg + ":" + verify_key.version + signed_with_current_key_id = ev.signatures.get(server_name, {}).get( + want_key_id, None + ) + if signed_with_current_key_id: + return False + + return True diff --git a/synapse/rest/admin/background_updates.py b/synapse/rest/admin/background_updates.py index 96190c416d..e693a0afd3 100644 --- a/synapse/rest/admin/background_updates.py +++ b/synapse/rest/admin/background_updates.py @@ -18,6 +18,7 @@ # [This file includes modifications made by New Vector Limited] # # +import json import logging from http import HTTPStatus from typing import TYPE_CHECKING @@ -150,6 +151,24 @@ class BackgroundUpdateStartJobRestServlet(RestServlet): "populate_user_directory_process_users", ), ] + elif job_name == "event_resign": + old_key = body.get("old_key") + if old_key is not None and not isinstance(old_key, str): + raise SynapseError( + HTTPStatus.BAD_REQUEST, + "'old_key' must be a string", + ) + before_ts = body.get("before_ts") + if before_ts is not None and not isinstance(before_ts, int): + raise SynapseError( + HTTPStatus.BAD_REQUEST, + "'before_ts' must be an integer", + ) + progress = { + "old_key": old_key, + "before_ts": before_ts, + } + jobs = [("event_resign", json.dumps(progress), "")] else: raise SynapseError(HTTPStatus.BAD_REQUEST, "Invalid job_name") diff --git a/synapse/storage/databases/main/events_bg_updates.py b/synapse/storage/databases/main/events_bg_updates.py index 934cd157ca..d2623f0760 100644 --- a/synapse/storage/databases/main/events_bg_updates.py +++ b/synapse/storage/databases/main/events_bg_updates.py @@ -23,6 +23,8 @@ import logging from typing import TYPE_CHECKING, cast import attr +from signedjson.key import decode_verify_key_base64, get_verify_key +from signedjson.sign import SignatureVerifyException, verify_signed_json from synapse.api.constants import ( MAX_DEPTH, @@ -31,7 +33,12 @@ from synapse.api.constants import ( RelationTypes, ) from synapse.api.room_versions import KNOWN_ROOM_VERSIONS +from synapse.crypto.event_signing import ( + event_needs_resigning, + resign_event, +) from synapse.events import EventBase, make_event_from_dict +from synapse.events.utils import prune_event_dict from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause from synapse.storage.database import ( DatabasePool, @@ -39,6 +46,7 @@ from synapse.storage.database import ( LoggingTransaction, make_tuple_comparison_clause, ) +from synapse.storage.databases.main.cache import CacheInvalidationWorkerStore from synapse.storage.databases.main.events import ( SLIDING_SYNC_RELEVANT_STATE_SET, PersistEventsStore, @@ -48,6 +56,7 @@ from synapse.storage.databases.main.events import ( ) from synapse.storage.databases.main.events_worker import ( DatabaseCorruptionError, + EventRedactBehaviour, InvalidEventError, ) from synapse.storage.databases.main.state_deltas import StateDeltasStore @@ -112,7 +121,9 @@ class _JoinedRoomStreamOrderingUpdate: most_recent_bump_stamp: int | None -class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseStore): +class EventsBackgroundUpdatesStore( + StreamWorkerStore, StateDeltasStore, CacheInvalidationWorkerStore, SQLBaseStore +): def __init__( self, database: DatabasePool, @@ -346,6 +357,11 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS _BackgroundUpdates.FIXUP_MAX_DEPTH_CAP, self.fixup_max_depth_cap_bg_update ) + self.db_pool.updates.register_background_update_handler( + _BackgroundUpdates.EVENT_RESIGN, + self._resign_events, + ) + # We want this to run on the main database at startup before we start processing # events. # @@ -1370,7 +1386,7 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS ) # Iterate the parent IDs and invalidate caches. - self._invalidate_cache_and_stream_bulk( # type: ignore[attr-defined] + self._invalidate_cache_and_stream_bulk( txn, self.get_relations_for_event, # type: ignore[attr-defined] { @@ -1381,7 +1397,7 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS for r in relations_to_insert }, ) - self._invalidate_cache_and_stream_bulk( # type: ignore[attr-defined] + self._invalidate_cache_and_stream_bulk( txn, self.get_thread_summary, # type: ignore[attr-defined] {(r[1],) for r in relations_to_insert}, @@ -2713,6 +2729,177 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS return num_rooms + async def _resign_events(self, progress: dict, batch_size: int) -> int: + """Retroactively re-sign events signed with a different key than the + current signing key. + + Optional progress parameters: + old_key: If set, only re-sign events whose signature can be + verified with this key. Format: "algorithm:key_id base64key" + (e.g. "ed25519:my_old_key XGX0JRS2Af3be3k..."). + before_ts: If set, only re-sign events with a received_ts less + than this value (milliseconds since epoch). + """ + + # Read optional filter parameters from progress. These are set once + # when the job is created and preserved across batches. + old_key_str: str | None = progress.get("old_key") + before_ts: int | None = progress.get("before_ts") + + # Parse the old verify key if provided. + old_verify_key = None + if old_key_str is not None: + parts = old_key_str.split(" ", 1) + if len(parts) == 2: + key_id, key_base64 = parts + alg, _, version = key_id.partition(":") + old_verify_key = decode_verify_key_base64(alg, version, key_base64) + else: + raise ValueError( + f"Invalid old_key format: expected 'algorithm:version base64key', got {old_key_str!r}" + ) + + # Load the next set of candidate events to re-sign. + # Returns the event IDs and the highest stream position for those events. + # If no event IDs are returned, this signals the background update is complete. + def _fetch_next_events_txn( + txn: LoggingTransaction, + ) -> tuple[list[str], int]: + # Start from the minimum 32-bit integer to ensure we cover events + # with negative stream orderings (e.g. from backfill). + last_stream_pos: int = progress.get("last_stream_pos", -(1 << 31)) + + sql = """ + SELECT event_id, stream_ordering FROM events + WHERE stream_ordering > ? AND sender LIKE ? + """ + args: list[object] = [ + last_stream_pos, + f"%:{self.hs.hostname}", + ] + + if before_ts is not None: + sql += " AND received_ts < ?" + args.append(before_ts) + + sql += " ORDER BY stream_ordering ASC LIMIT ?" + args.append(batch_size) + + txn.execute(sql, args) + event_rows: list[tuple[str, int]] = txn.fetchall() + if not event_rows: + return [], last_stream_pos + + last_stream_pos = event_rows[-1][1] + return [row[0] for row in event_rows], last_stream_pos + + next_event_ids, max_stream_pos = await self.db_pool.runInteraction( + "_resign_events._fetch_next_events", + _fetch_next_events_txn, + ) + logger.debug( + "Resign[num_checking=%d,sp=%d]", len(next_event_ids), max_stream_pos + ) + + if not next_event_ids: + await self.db_pool.updates._end_background_update( + _BackgroundUpdates.EVENT_RESIGN + ) + return 0 + + next_events = await self.get_events_as_list( + next_event_ids, + redact_behaviour=EventRedactBehaviour.as_is, + ) + current_verify_key = get_verify_key(self.hs.signing_key) + + # Re-sign any events that need it. + # A list of event IDs and their newly signed event dicts. + resigned_events: list[tuple[str, JsonDict]] = [] + for event in next_events: + if not event_needs_resigning(event, self.hs.hostname, current_verify_key): + continue + + # If old_key is set, only re-sign events whose signature verifies + # with the provided old key. + if old_verify_key is not None: + old_key_id = f"{old_verify_key.alg}:{old_verify_key.version}" + server_sigs = event.signatures.get(self.hs.hostname, {}) + if old_key_id not in server_sigs: + # Event wasn't signed with this key ID at all, skip. + continue + + # Verify the signature is genuinely from this key. We prune + # first since signatures are computed over the redacted form. + pruned = prune_event_dict(event.room_version, event.get_pdu_json()) + try: + verify_signed_json(pruned, self.hs.hostname, old_verify_key) + except SignatureVerifyException: + # In this case, the key ID was right but the signature doesn't match + # the public key we had. We definitely need to log about this. + logger.warning( + "Event %s has a signature for key %s that does not " + "verify — skipping", + event.event_id, + old_key_id, + ) + continue + + event_dict = resign_event(event, self.hs.hostname, self.hs.signing_key) + resigned_events.append((event.event_id, event_dict)) + + # Atomically write the new stream pos progress with the new signatures, + # else we may update the pos and crash before writing the new + # signatures, thus not re-signing at all! + def _write_events_txn( + txn: LoggingTransaction, + events_to_write: list[tuple[str, JsonDict]], + max_stream_pos: int, + ) -> None: + if events_to_write: + self.db_pool.simple_update_many_txn( + txn, + "event_json", + key_names=["event_id"], + key_values=[[event_id] for event_id, _ in events_to_write], + value_names=["json"], + value_values=[ + [json_encoder.encode(event_dict)] + for _, event_dict in events_to_write + ], + ) + # Always update the progress even if we re-sign nothing. + self.db_pool.updates._background_update_progress_txn( + txn, + _BackgroundUpdates.EVENT_RESIGN, + progress={ + "last_stream_pos": max_stream_pos, + "old_key": old_key_str, + "before_ts": before_ts, + }, + ) + + # Invalidate the event cache for re-signed events so that other + # workers also pick up the new signatures. + for event_id, _ in events_to_write: + self.invalidate_get_event_cache_after_txn(txn, event_id) + self._send_invalidation_to_replication( + txn, "_get_event_cache", (event_id,) + ) + + await self.db_pool.runInteraction( + "_resign_events._write_events_txn", + _write_events_txn, + resigned_events, + max_stream_pos, + ) + + logger.info("Re-signed %d events", len(resigned_events)) + + # Even if we don't re-sign them, we need to let the background updater + # know we're still churning through the events. + return len(next_event_ids) + def _resolve_stale_data_in_sliding_sync_tables( txn: LoggingTransaction, diff --git a/synapse/types/storage/__init__.py b/synapse/types/storage/__init__.py index 992c36caba..6b857aeb0e 100644 --- a/synapse/types/storage/__init__.py +++ b/synapse/types/storage/__init__.py @@ -66,3 +66,5 @@ class _BackgroundUpdates: FIXUP_MAX_DEPTH_CAP = "fixup_max_depth_cap" REDACTIONS_RECHECK_BG_UPDATE = "redactions_recheck" + + EVENT_RESIGN = "event_resign" diff --git a/tests/crypto/test_event_signing.py b/tests/crypto/test_event_signing.py index 9cdc1604da..334ff64bc2 100644 --- a/tests/crypto/test_event_signing.py +++ b/tests/crypto/test_event_signing.py @@ -19,12 +19,23 @@ # # -from signedjson.key import decode_signing_key_base64 +from typing import TypedDict + +from signedjson.key import ( + decode_signing_key_base64, + generate_signing_key, + get_verify_key, +) from signedjson.types import SigningKey from synapse.api.room_versions import RoomVersions -from synapse.crypto.event_signing import add_hashes_and_signatures -from synapse.events import make_event_from_dict +from synapse.crypto.event_signing import ( + add_hashes_and_signatures, + event_needs_resigning, + resign_event, +) +from synapse.events import EventBase, make_event_from_dict +from synapse.types import JsonDict from tests import unittest @@ -107,3 +118,121 @@ class EventSigningTestCase(unittest.TestCase): "Ay4aj2b5oJ1k8INYZ9n3KnszCflM0emwcmQQ7vxpbdc" "Sv9bkJxIZdWX1IJllcZLq89+D3sSabE+vqPtZs9akDw", ) + + +class EventResigningTestCase(unittest.TestCase): + def setUp(self) -> None: + self.signing_key: SigningKey = decode_signing_key_base64( + KEY_ALG, KEY_VER, SIGNING_KEY_SEED + ) + + def test_resign(self) -> None: + event_dict: JsonDict = { + "content": {"body": "Here is the message content"}, + "event_id": "$fffff:" + HOSTNAME, + "origin_server_ts": 1000000, + "type": "m.room.message", + "room_id": "!r:" + HOSTNAME, + "sender": "@u:" + HOSTNAME, + "signatures": {}, + "unsigned": {"age_ts": 1000000}, + } + add_hashes_and_signatures( + RoomVersions.V1, event_dict, HOSTNAME, self.signing_key + ) + event = make_event_from_dict(event_dict) + self.assertIn(HOSTNAME, event.signatures) + self.assertIn(KEY_NAME, event.signatures[HOSTNAME]) + signature = event.signatures[HOSTNAME][KEY_NAME] + + # Re-sign with a different key + signing_key_2: SigningKey = generate_signing_key("2") + key_name_2 = "ed25519:2" + + resigned_event = resign_event(event, HOSTNAME, signing_key_2) + self.assertIn(HOSTNAME, resigned_event["signatures"]) + self.assertIn(key_name_2, resigned_event["signatures"][HOSTNAME]) + self.assertEqual( + len(resigned_event["signatures"][HOSTNAME]), 1 + ) # the previous signature was removed. + self.assertNotEqual( + signature, resigned_event["signatures"][HOSTNAME][key_name_2] + ) # different signatures + + # Repeat but with an event without any signatures. + event_dict = { + "content": {"body": "Here is the message content"}, + "event_id": "$fffff:" + HOSTNAME, + "origin_server_ts": 1000000, + "type": "m.room.message", + "room_id": "!r:" + HOSTNAME, + "sender": "@u:" + HOSTNAME, + "signatures": {}, + "unsigned": {"age_ts": 1000000}, + } + event = make_event_from_dict(event_dict) + resigned_event = resign_event(event, HOSTNAME, signing_key_2) + self.assertIn(HOSTNAME, resigned_event["signatures"]) + self.assertIn(key_name_2, resigned_event["signatures"][HOSTNAME]) + self.assertEqual(len(resigned_event["signatures"][HOSTNAME]), 1) + + def test_event_needs_resigning(self) -> None: + event_that_needs_resigning_dict: JsonDict = { + "content": {"body": "Here is the message content"}, + "event_id": "$fffff:" + HOSTNAME, + "origin_server_ts": 1000000, + "type": "m.room.message", + "room_id": "!r:" + HOSTNAME, + "sender": "@u:" + HOSTNAME, + "unsigned": {"age_ts": 1000000}, + } + internal_metadata: JsonDict = {} + event_that_needs_resigning = make_event_from_dict( + event_that_needs_resigning_dict, + RoomVersions.V1, + internal_metadata, + ) + self.assertEqual( + event_needs_resigning( + event_that_needs_resigning, HOSTNAME, get_verify_key(self.signing_key) + ), + True, + ) + + class TestCase(TypedDict): + name: str + event: EventBase + + events_that_dont_need_resigning: list[TestCase] = [ + { + "name": "sender domain isn't ours", + "event": make_event_from_dict( + {**event_that_needs_resigning_dict, "sender": "@u:somewhereelse"}, + RoomVersions.V1, + internal_metadata, + ), + }, + { + "name": "already signed with this key", + "event": make_event_from_dict( + { + **event_that_needs_resigning_dict, + "signatures": { + HOSTNAME: { + KEY_NAME: "thisisntchecked", + }, + }, + }, + RoomVersions.V1, + internal_metadata, + ), + }, + ] + for test_case in events_that_dont_need_resigning: + self.assertEqual( + event_needs_resigning( + test_case["event"], HOSTNAME, get_verify_key(self.signing_key) + ), + False, + test_case["name"], + ) diff --git a/tests/storage/test_events_bg_updates.py b/tests/storage/test_events_bg_updates.py index a5b53de77f..aceacec98e 100644 --- a/tests/storage/test_events_bg_updates.py +++ b/tests/storage/test_events_bg_updates.py @@ -14,17 +14,23 @@ # +import json + +import signedjson.key from canonicaljson import encode_canonical_json from twisted.internet.testing import MemoryReactor from synapse.api.constants import MAX_DEPTH from synapse.api.room_versions import RoomVersion, RoomVersions +from synapse.rest import admin +from synapse.rest.client import login, room from synapse.server import HomeServer +from synapse.storage.background_updates import BackgroundUpdater from synapse.types.storage import _BackgroundUpdates from synapse.util.clock import Clock -from tests.unittest import HomeserverTestCase +from tests.unittest import HomeserverTestCase, override_config class TestFixupMaxDepthCapBgUpdate(HomeserverTestCase): @@ -287,3 +293,163 @@ class TestRedactionsRecheckBgUpdate(HomeserverTestCase): self.assertTrue(self._get_recheck("$redact5:test")) self.assertFalse(self._get_recheck("$redact6:test")) self.assertTrue(self._get_recheck("$redact7:test")) + + +class TestResignEventsBgUpdate(HomeserverTestCase): + """Test the background update that re-signs events.""" + + servlets = [ + admin.register_servlets, + room.register_servlets, + login.register_servlets, + ] + + def prepare( + self, reactor: MemoryReactor, clock: Clock, homeserver: HomeServer + ) -> None: + self.updates: BackgroundUpdater = self.hs.get_datastores().main.db_pool.updates + self.store = self.hs.get_datastores().main + self.db_pool = self.store.db_pool + + self.room_id = "!testroom:example.com" + + @override_config({"caches": {"global_factor": 1}, "event_cache_size": "999"}) + def test_events_are_resigned_after_bg_update_runs(self) -> None: + """Test that the background update correctly re-signs existing events with the + new key""" + + # Ensure all background updates have finished running + self.wait_for_background_updates() + + # Set up a room with a local and remote user in it. + self.register_user("user", "pass") + token = self.login("user", "pass") + + # Create new room + room_id = self.helper.create_room_as( + "user", room_version=RoomVersions.V12.identifier, tok=token + ) + + # Send a message + body = self.helper.send(room_id, body="Test", tok=token) + + old_event = self.get_success(self.store.get_event(body["event_id"])) + old_key_id = f"{self.hs.signing_key.alg}:{self.hs.signing_key.version}" + + # Ensure the message event is in the cache so that we test the cache is + # invalidated properly + res = self.store._get_event_cache.get_local((old_event.event_id,)) + self.assertEqual(res.event, old_event, "Event not cached as expected.") # type: ignore + + # Ensure message event is signed with original signing key + self.assertIn( + old_key_id, old_event.signatures[self.hs.config.server.server_name] + ) + + # Generate a new signing key + self.hs.signing_key = signedjson.key.generate_signing_key("new-test-key") + + # Reinsert the background update as it was already run at the start of + # the test. + self.get_success( + self.db_pool.simple_insert( + table="background_updates", + values={ + "update_name": "event_resign", + "progress_json": "{}", + }, + ) + ) + self.updates.start_doing_background_updates() + # Ensure the background updates have finished running + self.wait_for_background_updates() + + # Get the event from the database again + new_event = self.get_success(self.store.get_event(body["event_id"])) + new_key_id = f"{self.hs.signing_key.alg}:{self.hs.signing_key.version}" + + # Ensure message event is signed with new signing key, and not with the original + # signing key + self.assertNotIn( + old_key_id, new_event.signatures[self.hs.config.server.server_name] + ) + self.assertIn( + new_key_id, new_event.signatures[self.hs.config.server.server_name] + ) + + @override_config({"caches": {"global_factor": 1}, "event_cache_size": "999"}) + def test_old_key_filter(self) -> None: + """Test that old_key parameter causes only events whose signature + verifies against the provided key to be re-signed.""" + + self.wait_for_background_updates() + + self.register_user("user2", "pass") + token = self.login("user2", "pass") + + room_id = self.helper.create_room_as( + "user2", room_version=RoomVersions.V12.identifier, tok=token + ) + body = self.helper.send(room_id, body="Test old_key", tok=token) + + old_signing_key = self.hs.signing_key + old_key_id = f"{old_signing_key.alg}:{old_signing_key.version}" + old_verify_key = signedjson.key.get_verify_key(old_signing_key) + old_key_param = ( + f"{old_verify_key.alg}:{old_verify_key.version} " + f"{signedjson.key.encode_verify_key_base64(old_verify_key)}" + ) + + # Generate a new signing key + self.hs.signing_key = signedjson.key.generate_signing_key("new-test-key-2") + + # Generate a different key but reuse the same key ID/version, to + # ensure we're filtering on the actual public key, not just the ID. + wrong_key = signedjson.key.generate_signing_key(old_signing_key.version) + wrong_verify_key = signedjson.key.get_verify_key(wrong_key) + wrong_key_param = ( + f"{old_verify_key.alg}:{old_verify_key.version} " + f"{signedjson.key.encode_verify_key_base64(wrong_verify_key)}" + ) + + # Insert BG update with old_key filter pointing to a WRONG key + self.get_success( + self.db_pool.simple_insert( + table="background_updates", + values={ + "update_name": "event_resign", + "progress_json": json.dumps({"old_key": wrong_key_param}), + }, + ) + ) + self.updates.start_doing_background_updates() + self.wait_for_background_updates() + + # Event should NOT have been re-signed (wrong key) + event_after = self.get_success(self.store.get_event(body["event_id"])) + self.assertIn( + old_key_id, event_after.signatures[self.hs.config.server.server_name] + ) + + # Now insert BG update with the CORRECT old key + self.get_success( + self.db_pool.simple_insert( + table="background_updates", + values={ + "update_name": "event_resign", + "progress_json": json.dumps({"old_key": old_key_param}), + }, + ) + ) + self.updates.start_doing_background_updates() + self.wait_for_background_updates() + + # Event should now be re-signed + new_event = self.get_success(self.store.get_event(body["event_id"])) + new_key_id = f"{self.hs.signing_key.alg}:{self.hs.signing_key.version}" + self.assertNotIn( + old_key_id, new_event.signatures[self.hs.config.server.server_name] + ) + self.assertIn( + new_key_id, new_event.signatures[self.hs.config.server.server_name] + )