use std::collections::HashSet; use indexmap::IndexSet; use itertools::Itertools; use super::{Batch, Gap, OrderKey, StitchedItem, StitcherBackend}; /// Updates to a gap in the stitched order. #[derive(Debug)] pub struct GapUpdate<'id, K: OrderKey> { /// The opaque key of the gap to update. pub key: K, /// The new contents of the gap. If this is empty, the gap should be /// deleted. pub gap: Gap, /// New items to insert after the gap. These items _should not_ be /// synchronized to clients. pub inserted_items: Vec>, } /// Updates to the stitched order. #[derive(Debug)] pub struct OrderUpdates<'id, K: OrderKey> { /// Updates to individual gaps. The items inserted by these updates _should /// not_ be synchronized to clients. pub gap_updates: Vec>, /// New items to append to the end of the order. These items _should_ be /// synchronized to clients. pub new_items: Vec>, // The subset of events in the batch which got slotted into an existing gap. This is tracked // for unit testing and may eventually be sent to clients. pub events_added_to_gaps: HashSet<&'id str>, } /// The stitcher, which implements the stitched ordering algorithm. /// Its primary method is [`Stitcher::stitch`]. pub struct Stitcher<'backend, B: StitcherBackend> { backend: &'backend B, } impl Stitcher<'_, B> { /// Create a new [`Stitcher`] given a [`StitcherBackend`]. pub fn new(backend: &B) -> Stitcher<'_, B> { Stitcher { backend } } /// Given a [`Batch`], compute the [`OrderUpdates`] which should be made to /// the stitched order to incorporate that batch. It is the responsibility /// of the caller to apply the updates. pub fn stitch<'id>(&self, batch: &Batch<'id>) -> OrderUpdates<'id, B::Key> { let mut gap_updates = Vec::new(); let mut events_added_to_gaps: HashSet<&'id str> = HashSet::new(); // Events in the batch which haven't been fitted into a gap or appended to the // end yet. let mut remaining_events: IndexSet<_> = batch.events().collect(); // 1: Find existing gaps which include IDs of events in `batch` let matching_gaps = self.backend.find_matching_gaps(batch.events()); // Repeat steps 2-9 for each matching gap for (key, mut gap) in matching_gaps { // 2. Find events in `batch` which are mentioned in `gap` let matching_events = remaining_events.iter().filter(|id| gap.contains(**id)); // Extend `events_added_to_gaps` with the matching events, which are destined to // be slotted into gaps. events_added_to_gaps.extend(matching_events.clone()); // 3. Create the to-insert list from the predecessor sets of each matching event let events_to_insert: Vec<_> = matching_events .filter_map(|event| batch.predecessors(event)) .flat_map(|predecessors| predecessors.predecessor_set.iter()) .filter(|event| remaining_events.contains(*event)) .copied() .collect(); // 4. Remove the events in the to-insert list from `remaining_events` so they // aren't processed again remaining_events.retain(|event| !events_to_insert.contains(event)); // 5 and 6 let inserted_items = self.sort_events_and_create_gaps(batch, events_to_insert); // 8. Update gap gap.retain(|id| !batch.contains(id)); // 7 and 9. Append to-insert list and delete gap if empty // The actual work of mutating the order is handled by the callee, // we just record an update to make. gap_updates.push(GapUpdate { key: key.clone(), gap, inserted_items }); } // 10. Append remaining events and gaps let new_items = self.sort_events_and_create_gaps(batch, remaining_events); OrderUpdates { gap_updates, new_items, events_added_to_gaps, } } fn sort_events_and_create_gaps<'id>( &self, batch: &Batch<'id>, events_to_insert: impl IntoIterator, ) -> Vec> { // 5. Sort the to-insert list with DAG;received order let events_to_insert = events_to_insert .into_iter() .sorted_by(batch.compare_by_dag_received()) .collect_vec(); // allocate 1.5x the size of the to-insert list let items_capacity = events_to_insert .capacity() .saturating_add(events_to_insert.capacity().div_euclid(2)); let mut items = Vec::with_capacity(items_capacity); for event in events_to_insert { let missing_prev_events: HashSet = batch .predecessors(event) .expect("events in to_insert should be in batch") .prev_events .iter() .filter(|prev_event| { !(batch.contains(prev_event) || self.backend.event_exists(prev_event)) }) .map(|id| String::from(*id)) .collect(); if !missing_prev_events.is_empty() { items.push(StitchedItem::Gap(missing_prev_events)); } items.push(StitchedItem::Event(event)); } items } }