Files
synapse/complement/tests/federation_room_join_test.go
Eric Eastwood f52b144c5f Explain natural behavior of events lost in the /make_join//send_join gap (#19856)
To better explain that this new thing we're doing in
https://github.com/element-hq/synapse/pull/19390 is a good citizen thing, not a requirement.

Follow-up to https://github.com/element-hq/synapse/pull/19390
2026-06-16 09:38:44 -05:00

188 lines
7.6 KiB
Go

package synapse_tests
import (
"context"
"io"
"net/http"
"sync/atomic"
"testing"
"time"
"github.com/matrix-org/complement"
"github.com/matrix-org/gomatrixserverlib"
"github.com/tidwall/gjson"
"github.com/matrix-org/complement/b"
"github.com/matrix-org/complement/federation"
"github.com/matrix-org/complement/helpers"
"github.com/matrix-org/complement/must"
)
// This test verifies that events sent into a room between a /make_join and
// /send_join are not lost to the joining server. When an event is created
// during the join handshake, the join event's prev_events (set at make_join
// time) won't reference it, creating two forward extremities. The server
// handling the join should ensure the joining server can discover the missed
// event, for example by sending a follow-up event that references both
// extremities, prompting the joining server to backfill.
//
// See https://github.com/element-hq/synapse/pull/19390
//
// This test lives as a in-repo Synapse Complement test because the spec doesn't mandate
// which events should be resolvable after the `/make_join`/`/send_join` dance (or that
// a homeserver should send `m.dummy` events to tie things together).
//
// To be clear, resolving the events in the `/make_join`/`/send_join` gap would happen
// naturally as soon as someone else sends an event who is on a homeserver aware of the
// events in the gap (tie it into the DAG). The goal of the automatic dummy events is to
// make this happen more immediately by sending a `m.dummy` event that ties things in
// instead of waiting for another event to be sent naturally.
func TestEventBetweenMakeJoinAndSendJoinIsNotLost(t *testing.T) {
deployment := complement.Deploy(t, 1)
defer deployment.Destroy(t)
alice := deployment.Register(t, "hs1", helpers.RegistrationOpts{})
// We track the message event ID sent between make_join and send_join.
// After send_join, we wait for hs1 to send us either:
// - the message event itself, or
// - any event whose prev_events reference the message (e.g. a dummy event)
//
// atomic.Value is used because messageEventID is written on the main goroutine and
// read on the HTTP handler goroutine, and needs synchronization (without
// synchronization, writes are not guaranteed to be observed by other goroutines)
var messageEventID atomic.Value
messageEventID.Store("")
messageDiscoverableWaiter := helpers.NewWaiter()
srv := federation.NewServer(t, deployment,
// hs1 fetches our signing keys via /_matrix/key/v2/server to verify our
// identity before accepting federation requests. Without this handler,
// make_join is rejected with 401 M_UNAUTHORIZED.
federation.HandleKeyRequests(),
)
// After send_join, hs1 will start sending us federation transactions via
// /_matrix/federation/v1/send/{txnID}. Since we handle /send manually
// below, any other requests (e.g. key fetches) that arrive unexpectedly
// should be tolerated rather than treated as test failures.
srv.UnexpectedRequestsAreErrors = false
// Custom /send handler: hs1 will push new room events to us via federation
// transactions once we've joined. We use a raw handler because the
// Complement server is not fully in the room until send_join completes, so
// we can't use HandleTransactionRequests (which requires the room in
// srv.rooms). Instead we parse the raw transaction body ourselves.
srv.Mux().
Handle("/_matrix/federation/v1/send/{transactionID}", http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
body, err := io.ReadAll(req.Body)
must.NotError(t, "failed to read request body in /send handler: %v", err)
txn := gjson.ParseBytes(body)
txn.Get("pdus").ForEach(func(_, pdu gjson.Result) bool {
eventID := pdu.Get("event_id").String()
eventType := pdu.Get("type").String()
t.Logf("Received PDU via /send: type=%s id=%s", eventType, eventID)
// messageEventID is set after make_join but before send_join.
// Transactions can arrive before that window, so skip PDUs that
// arrive before we know which event to look for.
msgID, _ := messageEventID.Load().(string)
if msgID == "" {
return true
}
// Check if this IS the message event (server pushed it directly).
if eventID == msgID {
messageDiscoverableWaiter.Finish()
return true
}
// Check if this event's prev_events directly reference the message (e.g. a dummy
// event tying the two forward extremities together). If so, the joining server
// can backfill from that event and will discover the message.
//
// XXX: We only check one level of prev_events: if the reference is deeper in the
// DAG, it's valid and the joining server can still reach the message through
// backfill but our checks don't account for that yet (feel free to edit this
// assertion if you run into this)
pdu.Get("prev_events").ForEach(func(_, prevEvent gjson.Result) bool {
if prevEvent.String() == msgID {
messageDiscoverableWaiter.Finish()
return false
}
return true
})
return true
})
w.WriteHeader(200)
// Respond with an empty PDU error map, which is the federation /send
// success response format: each key would be a PDU ID whose processing
// failed; an empty object means all PDUs were accepted.
_, err = w.Write([]byte(`{"pdus":{}}`))
must.NotError(t, "failed to write response body in /send handler: %v", err)
})).
Methods("PUT")
cancel := srv.Listen()
defer cancel()
// Alice creates a room on hs1.
roomID := alice.MustCreateRoom(t, map[string]interface{}{
"preset": "public_chat",
})
charlie := srv.UserID("charlie")
origin := srv.ServerName()
fedClient := srv.FederationClient(deployment)
// Step 1: make_join, hs1 returns a join event template whose prev_events
// reflect the current room DAG tips.
makeJoinResp, err := fedClient.MakeJoin(
context.Background(), origin,
deployment.GetFullyQualifiedHomeserverName(t, "hs1"),
roomID, charlie,
)
must.NotError(t, "MakeJoin", err)
// Step 2: Alice sends a message on hs1. This advances the DAG past the
// point captured by make_join's prev_events. The Complement server is not
// yet in the room, so it won't receive this event via normal federation.
messageEventID.Store(alice.SendEventSynced(t, roomID, b.Event{
Type: "m.room.message",
Content: map[string]interface{}{
"msgtype": "m.text",
"body": "Message sent between make_join and send_join",
},
}))
t.Logf("Alice sent message %s between make_join and send_join", messageEventID.Load())
// Step 3: Build and sign the join event, then send_join.
// The join event's prev_events are from step 1 (before the message),
// so persisting it on hs1 creates two forward extremities: the message
// and the join.
verImpl, err := gomatrixserverlib.GetRoomVersion(makeJoinResp.RoomVersion)
must.NotError(t, "GetRoomVersion", err)
eb := verImpl.NewEventBuilderFromProtoEvent(&makeJoinResp.JoinEvent)
joinEvent, err := eb.Build(time.Now(), srv.ServerName(), srv.KeyID, srv.Priv)
must.NotError(t, "Build join event", err)
_, err = fedClient.SendJoin(
context.Background(), origin,
deployment.GetFullyQualifiedHomeserverName(t, "hs1"),
joinEvent,
)
must.NotError(t, "SendJoin", err)
// Step 4: hs1 should make the missed message discoverable to the joining
// server. We accept either receiving the message event directly, or
// receiving any event whose prev_events reference it (allowing the
// joining server to backfill).
messageDiscoverableWaiter.Waitf(t, 5*time.Second,
"Timed out waiting for message event %s to become discoverable — "+
"the event sent between make_join and send_join was lost to the "+
"joining server", messageEventID.Load(),
)
}