mirror of
https://github.com/element-hq/synapse.git
synced 2026-07-02 09:12:16 +00:00
f52b144c5f
To better explain that this new thing we're doing in https://github.com/element-hq/synapse/pull/19390 is a good citizen thing, not a requirement. Follow-up to https://github.com/element-hq/synapse/pull/19390
188 lines
7.6 KiB
Go
188 lines
7.6 KiB
Go
package synapse_tests
|
|
|
|
import (
|
|
"context"
|
|
"io"
|
|
"net/http"
|
|
"sync/atomic"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/matrix-org/complement"
|
|
|
|
"github.com/matrix-org/gomatrixserverlib"
|
|
|
|
"github.com/tidwall/gjson"
|
|
|
|
"github.com/matrix-org/complement/b"
|
|
"github.com/matrix-org/complement/federation"
|
|
"github.com/matrix-org/complement/helpers"
|
|
"github.com/matrix-org/complement/must"
|
|
)
|
|
|
|
// This test verifies that events sent into a room between a /make_join and
|
|
// /send_join are not lost to the joining server. When an event is created
|
|
// during the join handshake, the join event's prev_events (set at make_join
|
|
// time) won't reference it, creating two forward extremities. The server
|
|
// handling the join should ensure the joining server can discover the missed
|
|
// event, for example by sending a follow-up event that references both
|
|
// extremities, prompting the joining server to backfill.
|
|
//
|
|
// See https://github.com/element-hq/synapse/pull/19390
|
|
//
|
|
// This test lives as a in-repo Synapse Complement test because the spec doesn't mandate
|
|
// which events should be resolvable after the `/make_join`/`/send_join` dance (or that
|
|
// a homeserver should send `m.dummy` events to tie things together).
|
|
//
|
|
// To be clear, resolving the events in the `/make_join`/`/send_join` gap would happen
|
|
// naturally as soon as someone else sends an event who is on a homeserver aware of the
|
|
// events in the gap (tie it into the DAG). The goal of the automatic dummy events is to
|
|
// make this happen more immediately by sending a `m.dummy` event that ties things in
|
|
// instead of waiting for another event to be sent naturally.
|
|
func TestEventBetweenMakeJoinAndSendJoinIsNotLost(t *testing.T) {
|
|
deployment := complement.Deploy(t, 1)
|
|
defer deployment.Destroy(t)
|
|
|
|
alice := deployment.Register(t, "hs1", helpers.RegistrationOpts{})
|
|
|
|
// We track the message event ID sent between make_join and send_join.
|
|
// After send_join, we wait for hs1 to send us either:
|
|
// - the message event itself, or
|
|
// - any event whose prev_events reference the message (e.g. a dummy event)
|
|
//
|
|
// atomic.Value is used because messageEventID is written on the main goroutine and
|
|
// read on the HTTP handler goroutine, and needs synchronization (without
|
|
// synchronization, writes are not guaranteed to be observed by other goroutines)
|
|
var messageEventID atomic.Value
|
|
messageEventID.Store("")
|
|
messageDiscoverableWaiter := helpers.NewWaiter()
|
|
|
|
srv := federation.NewServer(t, deployment,
|
|
// hs1 fetches our signing keys via /_matrix/key/v2/server to verify our
|
|
// identity before accepting federation requests. Without this handler,
|
|
// make_join is rejected with 401 M_UNAUTHORIZED.
|
|
federation.HandleKeyRequests(),
|
|
)
|
|
// After send_join, hs1 will start sending us federation transactions via
|
|
// /_matrix/federation/v1/send/{txnID}. Since we handle /send manually
|
|
// below, any other requests (e.g. key fetches) that arrive unexpectedly
|
|
// should be tolerated rather than treated as test failures.
|
|
srv.UnexpectedRequestsAreErrors = false
|
|
|
|
// Custom /send handler: hs1 will push new room events to us via federation
|
|
// transactions once we've joined. We use a raw handler because the
|
|
// Complement server is not fully in the room until send_join completes, so
|
|
// we can't use HandleTransactionRequests (which requires the room in
|
|
// srv.rooms). Instead we parse the raw transaction body ourselves.
|
|
srv.Mux().
|
|
Handle("/_matrix/federation/v1/send/{transactionID}", http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
|
|
body, err := io.ReadAll(req.Body)
|
|
must.NotError(t, "failed to read request body in /send handler: %v", err)
|
|
txn := gjson.ParseBytes(body)
|
|
txn.Get("pdus").ForEach(func(_, pdu gjson.Result) bool {
|
|
eventID := pdu.Get("event_id").String()
|
|
eventType := pdu.Get("type").String()
|
|
t.Logf("Received PDU via /send: type=%s id=%s", eventType, eventID)
|
|
|
|
// messageEventID is set after make_join but before send_join.
|
|
// Transactions can arrive before that window, so skip PDUs that
|
|
// arrive before we know which event to look for.
|
|
msgID, _ := messageEventID.Load().(string)
|
|
if msgID == "" {
|
|
return true
|
|
}
|
|
|
|
// Check if this IS the message event (server pushed it directly).
|
|
if eventID == msgID {
|
|
messageDiscoverableWaiter.Finish()
|
|
return true
|
|
}
|
|
|
|
// Check if this event's prev_events directly reference the message (e.g. a dummy
|
|
// event tying the two forward extremities together). If so, the joining server
|
|
// can backfill from that event and will discover the message.
|
|
//
|
|
// XXX: We only check one level of prev_events: if the reference is deeper in the
|
|
// DAG, it's valid and the joining server can still reach the message through
|
|
// backfill but our checks don't account for that yet (feel free to edit this
|
|
// assertion if you run into this)
|
|
pdu.Get("prev_events").ForEach(func(_, prevEvent gjson.Result) bool {
|
|
if prevEvent.String() == msgID {
|
|
messageDiscoverableWaiter.Finish()
|
|
return false
|
|
}
|
|
return true
|
|
})
|
|
|
|
return true
|
|
})
|
|
w.WriteHeader(200)
|
|
// Respond with an empty PDU error map, which is the federation /send
|
|
// success response format: each key would be a PDU ID whose processing
|
|
// failed; an empty object means all PDUs were accepted.
|
|
_, err = w.Write([]byte(`{"pdus":{}}`))
|
|
must.NotError(t, "failed to write response body in /send handler: %v", err)
|
|
})).
|
|
Methods("PUT")
|
|
|
|
cancel := srv.Listen()
|
|
defer cancel()
|
|
|
|
// Alice creates a room on hs1.
|
|
roomID := alice.MustCreateRoom(t, map[string]interface{}{
|
|
"preset": "public_chat",
|
|
})
|
|
|
|
charlie := srv.UserID("charlie")
|
|
origin := srv.ServerName()
|
|
fedClient := srv.FederationClient(deployment)
|
|
|
|
// Step 1: make_join, hs1 returns a join event template whose prev_events
|
|
// reflect the current room DAG tips.
|
|
makeJoinResp, err := fedClient.MakeJoin(
|
|
context.Background(), origin,
|
|
deployment.GetFullyQualifiedHomeserverName(t, "hs1"),
|
|
roomID, charlie,
|
|
)
|
|
must.NotError(t, "MakeJoin", err)
|
|
|
|
// Step 2: Alice sends a message on hs1. This advances the DAG past the
|
|
// point captured by make_join's prev_events. The Complement server is not
|
|
// yet in the room, so it won't receive this event via normal federation.
|
|
messageEventID.Store(alice.SendEventSynced(t, roomID, b.Event{
|
|
Type: "m.room.message",
|
|
Content: map[string]interface{}{
|
|
"msgtype": "m.text",
|
|
"body": "Message sent between make_join and send_join",
|
|
},
|
|
}))
|
|
t.Logf("Alice sent message %s between make_join and send_join", messageEventID.Load())
|
|
|
|
// Step 3: Build and sign the join event, then send_join.
|
|
// The join event's prev_events are from step 1 (before the message),
|
|
// so persisting it on hs1 creates two forward extremities: the message
|
|
// and the join.
|
|
verImpl, err := gomatrixserverlib.GetRoomVersion(makeJoinResp.RoomVersion)
|
|
must.NotError(t, "GetRoomVersion", err)
|
|
eb := verImpl.NewEventBuilderFromProtoEvent(&makeJoinResp.JoinEvent)
|
|
joinEvent, err := eb.Build(time.Now(), srv.ServerName(), srv.KeyID, srv.Priv)
|
|
must.NotError(t, "Build join event", err)
|
|
|
|
_, err = fedClient.SendJoin(
|
|
context.Background(), origin,
|
|
deployment.GetFullyQualifiedHomeserverName(t, "hs1"),
|
|
joinEvent,
|
|
)
|
|
must.NotError(t, "SendJoin", err)
|
|
|
|
// Step 4: hs1 should make the missed message discoverable to the joining
|
|
// server. We accept either receiving the message event directly, or
|
|
// receiving any event whose prev_events reference it (allowing the
|
|
// joining server to backfill).
|
|
messageDiscoverableWaiter.Waitf(t, 5*time.Second,
|
|
"Timed out waiting for message event %s to become discoverable — "+
|
|
"the event sent between make_join and send_join was lost to the "+
|
|
"joining server", messageEventID.Load(),
|
|
)
|
|
}
|