mirror of
https://github.com/scylladb/scylladb.git
synced 2026-05-23 08:12:08 +00:00
Merge 'Refactor view_update_builder' from Wojciech Mitros
This series improves the readability and structure of
view_update_builder, the component that generates materialized view
updates from base-table mutations.
The first four patches are pure renames and refactoring with no
semantic changes:
1. Document that the builder operates on a single base partition.
2. Rename member fields to clearly distinguish readers (the
mutation_reader streams) from the cached fragments (the last
mutation_fragment_v2 read from each stream).
3. Rename advance/on_results methods to names that describe what
they actually do: read the next fragment, or generate view
updates.
4. Extract partition-start handling into its own method.
The next two patches are minor optimizations:
5. Simplify clustering-row handling by moving the row out of the
fragment before applying the tombstone, avoiding an unnecessary
memory-usage recalculation in the reader permit.
6. Replace deep copies with moves in the existing-only tail path,
matching the pattern used everywhere else.
Finally, patch 7 deduplicates the fragment-consuming logic by
extracting the three repeated blocks into consume_both_fragments(),
consume_update_fragment(), and consume_existing_fragment().
Code reorganization - no backport needed
Closes scylladb/scylladb#29497
* github.com:scylladb/scylladb:
mv: deduplicate code for consuming fragments in view_update_builder
mv: avoid unnecessary copies of existing rows in generate_updates()
mv: simplify clustering row handling in generate_updates()
mv: rename methods in view_update_builder for clarity
mv: rename view_update_builder readers and cached fragments
mv: drop redundant std::move from partition key extraction
mv: document single-partition builder scope
This commit is contained in:
260
db/view/view.cc
260
db/view/view.cc
@@ -1346,53 +1346,132 @@ bool view_updates::generate_partition_tombstone_update(
|
||||
}
|
||||
|
||||
future<> view_update_builder::close() noexcept {
|
||||
return when_all_succeed(_updates.close(), _existings->close()).discard_result();
|
||||
return when_all_succeed(_update_reader.close(), _existing_reader->close()).discard_result();
|
||||
}
|
||||
|
||||
future<stop_iteration> view_update_builder::advance_all() {
|
||||
auto existings_f = _existings ? (*_existings)() : make_ready_future<mutation_fragment_v2_opt>();
|
||||
return when_all(_updates(), std::move(existings_f)).then([this] (auto&& fragments) mutable {
|
||||
_update = std::move(std::get<0>(fragments).get());
|
||||
_existing = std::move(std::get<1>(fragments).get());
|
||||
future<stop_iteration> view_update_builder::read_both_next_fragments() {
|
||||
auto existings_f = _existing_reader ? (*_existing_reader)() : make_ready_future<mutation_fragment_v2_opt>();
|
||||
return when_all(_update_reader(), std::move(existings_f)).then([this] (auto&& fragments) mutable {
|
||||
_update_fragment = std::move(std::get<0>(fragments).get());
|
||||
_existing_fragment = std::move(std::get<1>(fragments).get());
|
||||
return stop_iteration::no;
|
||||
});
|
||||
}
|
||||
|
||||
future<stop_iteration> view_update_builder::advance_updates() {
|
||||
return _updates().then([this] (auto&& update) mutable {
|
||||
_update = std::move(update);
|
||||
future<stop_iteration> view_update_builder::read_next_update_fragment() {
|
||||
return _update_reader().then([this] (auto&& update) mutable {
|
||||
_update_fragment = std::move(update);
|
||||
return stop_iteration::no;
|
||||
});
|
||||
}
|
||||
|
||||
future<stop_iteration> view_update_builder::advance_existings() {
|
||||
if (!_existings) {
|
||||
future<stop_iteration> view_update_builder::read_next_existing_fragment() {
|
||||
if (!_existing_reader) {
|
||||
return make_ready_future<stop_iteration>(stop_iteration::no);
|
||||
}
|
||||
return (*_existings)().then([this] (auto&& existing) mutable {
|
||||
_existing = std::move(existing);
|
||||
return (*_existing_reader)().then([this] (auto&& existing) mutable {
|
||||
_existing_fragment = std::move(existing);
|
||||
return stop_iteration::no;
|
||||
});
|
||||
}
|
||||
|
||||
void view_update_builder::consume_both_fragments() {
|
||||
if (_update_fragment->is_range_tombstone_change()) {
|
||||
SCYLLA_ASSERT(_existing_fragment->is_range_tombstone_change());
|
||||
_existing_current_tombstone = std::move(*_existing_fragment).as_range_tombstone_change().tombstone();
|
||||
_update_current_tombstone = std::move(*_update_fragment).as_range_tombstone_change().tombstone();
|
||||
} else if (_update_fragment->is_clustering_row()) {
|
||||
SCYLLA_ASSERT(_existing_fragment->is_clustering_row());
|
||||
auto update = std::move(*_update_fragment).as_clustering_row();
|
||||
update.apply(std::max(_update_partition_tombstone, _update_current_tombstone));
|
||||
auto existing = std::move(*_existing_fragment).as_clustering_row();
|
||||
existing.apply(std::max(_existing_partition_tombstone, _existing_current_tombstone));
|
||||
generate_update(std::move(update), { std::move(existing) });
|
||||
} else if (_update_fragment->is_static_row()) {
|
||||
if (!_existing_fragment->is_static_row()) {
|
||||
on_internal_error(vlogger, format("Static row update mutation part {} shouldn't compare equal with an existing, non-static row mutation part {}",
|
||||
mutation_fragment_v2::printer(*_schema, *_update_fragment), mutation_fragment_v2::printer(*_schema, *_existing_fragment)));
|
||||
}
|
||||
generate_update(std::move(*_update_fragment).as_static_row(), _update_partition_tombstone, { std::move(*_existing_fragment).as_static_row() }, _existing_partition_tombstone);
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
void view_update_builder::consume_update_fragment() {
|
||||
// We have an update where there was nothing before
|
||||
if (_update_fragment->is_range_tombstone_change()) {
|
||||
_update_current_tombstone = _update_fragment->as_range_tombstone_change().tombstone();
|
||||
} else if (_update_fragment->is_clustering_row()) {
|
||||
auto update = std::move(*_update_fragment).as_clustering_row();
|
||||
update.apply(std::max(_update_partition_tombstone, _update_current_tombstone));
|
||||
auto tombstone = std::max(_existing_partition_tombstone, _existing_current_tombstone);
|
||||
auto existing = tombstone
|
||||
? std::optional<clustering_row>(std::in_place, update.key(), row_tombstone(std::move(tombstone)), row_marker(), ::row())
|
||||
: std::nullopt;
|
||||
generate_update(std::move(update), std::move(existing));
|
||||
} else if (_update_fragment->is_static_row()) {
|
||||
auto update = std::move(*_update_fragment).as_static_row();
|
||||
auto tombstone = _existing_partition_tombstone;
|
||||
auto existing = tombstone
|
||||
? std::optional<static_row>(std::in_place)
|
||||
: std::nullopt;
|
||||
generate_update(std::move(update), _update_partition_tombstone, std::move(existing), _existing_partition_tombstone);
|
||||
}
|
||||
}
|
||||
|
||||
void view_update_builder::consume_existing_fragment() {
|
||||
// We have something existing but no update (which will happen either because it's a range tombstone marker in
|
||||
// existing, or because we've fetched the existing row due to some partition/range deletion in the updates).
|
||||
// Due to how the read command for existing rows is constructed, it is also possible that there is a static
|
||||
// row is included, even though we didn't modify it.
|
||||
if (_existing_fragment->is_range_tombstone_change()) {
|
||||
_existing_current_tombstone = _existing_fragment->as_range_tombstone_change().tombstone();
|
||||
} else if (_existing_fragment->is_clustering_row()) {
|
||||
auto existing = std::move(*_existing_fragment).as_clustering_row();
|
||||
existing.apply(std::max(_existing_partition_tombstone, _existing_current_tombstone));
|
||||
auto tombstone = std::max(_update_partition_tombstone, _update_current_tombstone);
|
||||
// The way we build the read command used for existing rows, we should always have a non-empty
|
||||
// tombstone, since we wouldn't have read the existing row otherwise. We don't SCYLLA_ASSERT that in case the
|
||||
// read method ever changes.
|
||||
if (tombstone) {
|
||||
auto update = clustering_row(existing.key(), row_tombstone(std::move(tombstone)), row_marker(), ::row());
|
||||
generate_update(std::move(update), { std::move(existing) });
|
||||
}
|
||||
} else if (_existing_fragment->is_static_row()) {
|
||||
auto existing = std::move(*_existing_fragment).as_static_row();
|
||||
auto tombstone = _update_partition_tombstone;
|
||||
// The static row might be unintentionally included when fetching existing clustering rows,
|
||||
// even if the static row was not updated. We can detect it. A static row can be affected either by:
|
||||
//
|
||||
// 1. A static row in the update mutation
|
||||
// 2. A partition tombstone in the update mutation
|
||||
//
|
||||
// If neither of those is present, this means that the static row is included accidentally.
|
||||
// If we are here, this means that (1) is not present. The `if` that follows checks for (2).
|
||||
if (tombstone) {
|
||||
auto update = static_row();
|
||||
generate_update(std::move(update), _update_partition_tombstone, { std::move(existing) }, _existing_partition_tombstone);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
future<stop_iteration> view_update_builder::stop() const {
|
||||
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
||||
}
|
||||
|
||||
future<std::optional<utils::chunked_vector<frozen_mutation_and_schema>>> view_update_builder::build_some() {
|
||||
(void)co_await advance_all();
|
||||
if (!_update && !_existing) {
|
||||
(void)co_await read_both_next_fragments();
|
||||
if (!_update_fragment && !_existing_fragment) {
|
||||
// Tell the caller there is no more data to build.
|
||||
co_return std::nullopt;
|
||||
}
|
||||
bool do_advance_updates = false;
|
||||
bool do_advance_existings = false;
|
||||
bool is_partition_tombstone_applied_on_all_views = false;
|
||||
if (_update && _update->is_partition_start()) {
|
||||
_key = std::move(std::move(_update)->as_partition_start().key().key());
|
||||
_update_partition_tombstone = _update->as_partition_start().partition_tombstone();
|
||||
if (_update_fragment && _update_fragment->is_partition_start()) {
|
||||
_key = _update_fragment->as_partition_start().key().key();
|
||||
_update_partition_tombstone = _update_fragment->as_partition_start().partition_tombstone();
|
||||
do_advance_updates = true;
|
||||
|
||||
if (_update_partition_tombstone) {
|
||||
// For views that have the same partition key as base, generate an update of partition tombstone to delete
|
||||
// the entire partition in one operation, instead of generating an update for each row.
|
||||
@@ -1403,26 +1482,26 @@ future<std::optional<utils::chunked_vector<frozen_mutation_and_schema>>> view_up
|
||||
}
|
||||
}
|
||||
}
|
||||
if (_existing && _existing->is_partition_start()) {
|
||||
_existing_partition_tombstone = _existing->as_partition_start().partition_tombstone();
|
||||
if (_existing_fragment && _existing_fragment->is_partition_start()) {
|
||||
_existing_partition_tombstone = _existing_fragment->as_partition_start().partition_tombstone();
|
||||
do_advance_existings = true;
|
||||
}
|
||||
if (do_advance_updates) {
|
||||
co_await (do_advance_existings ? advance_all() : advance_updates());
|
||||
co_await (do_advance_existings ? read_both_next_fragments() : read_next_update_fragment());
|
||||
} else if (do_advance_existings) {
|
||||
co_await advance_existings();
|
||||
co_await read_next_existing_fragment();
|
||||
}
|
||||
if (utils::get_local_injector().enter("keep_mv_read_semaphore_units_10ms_longer") && _existing && _existing->is_clustering_row()) {
|
||||
if (utils::get_local_injector().enter("keep_mv_read_semaphore_units_10ms_longer") && _existing_fragment && _existing_fragment->is_clustering_row()) {
|
||||
co_await seastar::sleep(std::chrono::milliseconds(10));
|
||||
}
|
||||
|
||||
// If the partition tombstone update is applied to all the views and there are no other updates, we can skip going over
|
||||
// all the rows trying to generate row updates, because the partition tombstones already cover everything.
|
||||
if (is_partition_tombstone_applied_on_all_views && _update->is_end_of_partition()) {
|
||||
if (is_partition_tombstone_applied_on_all_views && _update_fragment->is_end_of_partition()) {
|
||||
_skip_row_updates = true;
|
||||
}
|
||||
|
||||
while (!_skip_row_updates && co_await on_results() == stop_iteration::no) {};
|
||||
while (!_skip_row_updates && co_await generate_updates() == stop_iteration::no) {};
|
||||
|
||||
utils::chunked_vector<frozen_mutation_and_schema> mutations;
|
||||
for (auto& update : _view_updates) {
|
||||
@@ -1490,7 +1569,7 @@ void view_update_builder::generate_update(static_row&& update, const tombstone&
|
||||
}
|
||||
}
|
||||
|
||||
future<stop_iteration> view_update_builder::on_results() {
|
||||
future<stop_iteration> view_update_builder::generate_updates() {
|
||||
constexpr size_t max_rows_for_view_updates = 100;
|
||||
auto should_stop_updates = [this] () -> bool {
|
||||
size_t rows_for_view_updates = std::accumulate(_view_updates.begin(), _view_updates.end(), 0, [] (size_t acc, const view_updates& vu) {
|
||||
@@ -1498,128 +1577,29 @@ future<stop_iteration> view_update_builder::on_results() {
|
||||
});
|
||||
return rows_for_view_updates >= max_rows_for_view_updates;
|
||||
};
|
||||
if (_update && !_update->is_end_of_partition() && _existing && !_existing->is_end_of_partition()) {
|
||||
auto cmp = position_in_partition::tri_compare(*_schema)(_update->position(), _existing->position());
|
||||
if (_update_fragment && !_update_fragment->is_end_of_partition() && _existing_fragment && !_existing_fragment->is_end_of_partition()) {
|
||||
auto cmp = position_in_partition::tri_compare(*_schema)(_update_fragment->position(), _existing_fragment->position());
|
||||
if (cmp < 0) {
|
||||
// We have an update where there was nothing before
|
||||
if (_update->is_range_tombstone_change()) {
|
||||
_update_current_tombstone = _update->as_range_tombstone_change().tombstone();
|
||||
} else if (_update->is_clustering_row()) {
|
||||
auto update = std::move(*_update).as_clustering_row();
|
||||
update.apply(std::max(_update_partition_tombstone, _update_current_tombstone));
|
||||
auto tombstone = std::max(_existing_partition_tombstone, _existing_current_tombstone);
|
||||
auto existing = tombstone
|
||||
? std::optional<clustering_row>(std::in_place, update.key(), row_tombstone(std::move(tombstone)), row_marker(), ::row())
|
||||
: std::nullopt;
|
||||
generate_update(std::move(update), std::move(existing));
|
||||
} else if (_update->is_static_row()) {
|
||||
auto update = std::move(*_update).as_static_row();
|
||||
auto tombstone = _existing_partition_tombstone;
|
||||
auto existing = tombstone
|
||||
? std::optional<static_row>(std::in_place)
|
||||
: std::nullopt;
|
||||
generate_update(std::move(update), _update_partition_tombstone, std::move(existing), _existing_partition_tombstone);
|
||||
}
|
||||
return should_stop_updates() ? stop() : advance_updates();
|
||||
consume_update_fragment();
|
||||
return should_stop_updates() ? stop() : read_next_update_fragment();
|
||||
}
|
||||
if (cmp > 0) {
|
||||
// We have something existing but no update (which will happen either because it's a range tombstone marker in
|
||||
// existing, or because we've fetched the existing row due to some partition/range deletion in the updates).
|
||||
// Due to how the read command for existing rows is constructed, it is also possible that there is a static
|
||||
// row is included, even though we didn't modify it.
|
||||
if (_existing->is_range_tombstone_change()) {
|
||||
_existing_current_tombstone = _existing->as_range_tombstone_change().tombstone();
|
||||
} else if (_existing->is_clustering_row()) {
|
||||
auto existing = std::move(*_existing).as_clustering_row();
|
||||
existing.apply(std::max(_existing_partition_tombstone, _existing_current_tombstone));
|
||||
auto tombstone = std::max(_update_partition_tombstone, _update_current_tombstone);
|
||||
// The way we build the read command used for existing rows, we should always have a non-empty
|
||||
// tombstone, since we wouldn't have read the existing row otherwise. We don't SCYLLA_ASSERT that in case the
|
||||
// read method ever changes.
|
||||
if (tombstone) {
|
||||
auto update = clustering_row(existing.key(), row_tombstone(std::move(tombstone)), row_marker(), ::row());
|
||||
generate_update(std::move(update), { std::move(existing) });
|
||||
}
|
||||
} else if (_existing->is_static_row()) {
|
||||
auto existing = std::move(*_existing).as_static_row();
|
||||
auto tombstone = _update_partition_tombstone;
|
||||
// The static row might be unintentionally included when fetching existing clustering rows,
|
||||
// even if the static row was not updated. We can detect it. A static row can be affected either by:
|
||||
//
|
||||
// 1. A static row in the update mutation
|
||||
// 2. A partition tombstone in the update mutation
|
||||
//
|
||||
// If neither of those is present, this means that the static row is included accidentally.
|
||||
// If we are here, this means that (1) is not present. The `if` that follows checks for (2).
|
||||
if (tombstone) {
|
||||
auto update = static_row();
|
||||
generate_update(std::move(update), _update_partition_tombstone, { std::move(existing) }, _existing_partition_tombstone);
|
||||
}
|
||||
}
|
||||
return should_stop_updates() ? stop () : advance_existings();
|
||||
consume_existing_fragment();
|
||||
return should_stop_updates() ? stop () : read_next_existing_fragment();
|
||||
}
|
||||
// We're updating a row that had pre-existing data
|
||||
if (_update->is_range_tombstone_change()) {
|
||||
SCYLLA_ASSERT(_existing->is_range_tombstone_change());
|
||||
_existing_current_tombstone = std::move(*_existing).as_range_tombstone_change().tombstone();
|
||||
_update_current_tombstone = std::move(*_update).as_range_tombstone_change().tombstone();
|
||||
} else if (_update->is_clustering_row()) {
|
||||
SCYLLA_ASSERT(_existing->is_clustering_row());
|
||||
_update->mutate_as_clustering_row(*_schema, [&] (clustering_row& cr) mutable {
|
||||
cr.apply(std::max(_update_partition_tombstone, _update_current_tombstone));
|
||||
});
|
||||
_existing->mutate_as_clustering_row(*_schema, [&] (clustering_row& cr) mutable {
|
||||
cr.apply(std::max(_existing_partition_tombstone, _existing_current_tombstone));
|
||||
});
|
||||
generate_update(std::move(*_update).as_clustering_row(), { std::move(*_existing).as_clustering_row() });
|
||||
} else if (_update->is_static_row()) {
|
||||
if (!_existing->is_static_row()) {
|
||||
on_internal_error(vlogger, format("Static row update mutation part {} shouldn't compare equal with an existing, non-static row mutation part {}",
|
||||
mutation_fragment_v2::printer(*_schema, *_update), mutation_fragment_v2::printer(*_schema, *_existing)));
|
||||
}
|
||||
generate_update(std::move(*_update).as_static_row(), _update_partition_tombstone, { std::move(*_existing).as_static_row() }, _existing_partition_tombstone);
|
||||
|
||||
}
|
||||
return should_stop_updates() ? stop() : advance_all();
|
||||
consume_both_fragments();
|
||||
return should_stop_updates() ? stop() : read_both_next_fragments();
|
||||
}
|
||||
|
||||
auto tombstone = std::max(_update_partition_tombstone, _update_current_tombstone);
|
||||
if (tombstone && _existing && !_existing->is_end_of_partition()) {
|
||||
if (_existing->is_range_tombstone_change()) {
|
||||
_existing_current_tombstone = _existing->as_range_tombstone_change().tombstone();
|
||||
} else if (_existing->is_clustering_row()) {
|
||||
auto existing = clustering_row(*_schema, _existing->as_clustering_row());
|
||||
existing.apply(std::max(_existing_partition_tombstone, _existing_current_tombstone));
|
||||
auto update = clustering_row(existing.key(), row_tombstone(std::move(tombstone)), row_marker(), ::row());
|
||||
generate_update(std::move(update), { std::move(existing) });
|
||||
} else if (_existing->is_static_row()) {
|
||||
auto existing = static_row(*_schema, _existing->as_static_row());
|
||||
auto update = static_row();
|
||||
generate_update(std::move(update), _update_partition_tombstone, { std::move(existing) }, _existing_partition_tombstone);
|
||||
}
|
||||
return should_stop_updates() ? stop() : advance_existings();
|
||||
if (_existing_fragment && !_existing_fragment->is_end_of_partition()) {
|
||||
consume_existing_fragment();
|
||||
return should_stop_updates() ? stop() : read_next_existing_fragment();
|
||||
}
|
||||
|
||||
if (_update && !_update->is_end_of_partition()) {
|
||||
if (_update->is_range_tombstone_change()) {
|
||||
_update_current_tombstone = _update->as_range_tombstone_change().tombstone();
|
||||
} else if (_update->is_clustering_row()) {
|
||||
_update->mutate_as_clustering_row(*_schema, [&] (clustering_row& cr) mutable {
|
||||
cr.apply(std::max(_update_partition_tombstone, _update_current_tombstone));
|
||||
});
|
||||
auto existing_tombstone = std::max(_existing_partition_tombstone, _existing_current_tombstone);
|
||||
auto existing = existing_tombstone
|
||||
? std::optional<clustering_row>(std::in_place, _update->as_clustering_row().key(), row_tombstone(std::move(existing_tombstone)), row_marker(), ::row())
|
||||
: std::nullopt;
|
||||
generate_update(std::move(*_update).as_clustering_row(), std::move(existing));
|
||||
} else if (_update->is_static_row()) {
|
||||
auto existing_tombstone = _existing_partition_tombstone;
|
||||
auto existing = existing_tombstone
|
||||
? std::optional<static_row>(std::in_place)
|
||||
: std::nullopt;
|
||||
generate_update(std::move(*_update).as_static_row(), _update_partition_tombstone, std::move(existing), _existing_partition_tombstone);
|
||||
}
|
||||
return should_stop_updates() ? stop() : advance_updates();
|
||||
if (_update_fragment && !_update_fragment->is_end_of_partition()) {
|
||||
consume_update_fragment();
|
||||
return should_stop_updates() ? stop() : read_next_update_fragment();
|
||||
}
|
||||
|
||||
return stop();
|
||||
|
||||
@@ -203,14 +203,14 @@ class view_update_builder {
|
||||
const replica::table& _base;
|
||||
schema_ptr _schema; // The base schema
|
||||
std::vector<view_updates> _view_updates;
|
||||
mutation_reader _updates;
|
||||
mutation_reader_opt _existings;
|
||||
mutation_reader _update_reader;
|
||||
mutation_reader_opt _existing_reader;
|
||||
tombstone _update_partition_tombstone;
|
||||
tombstone _update_current_tombstone;
|
||||
tombstone _existing_partition_tombstone;
|
||||
tombstone _existing_current_tombstone;
|
||||
mutation_fragment_v2_opt _update;
|
||||
mutation_fragment_v2_opt _existing;
|
||||
mutation_fragment_v2_opt _update_fragment;
|
||||
mutation_fragment_v2_opt _existing_fragment;
|
||||
gc_clock::time_point _now;
|
||||
partition_key _key = partition_key::make_empty();
|
||||
bool _skip_row_updates = false;
|
||||
@@ -225,12 +225,15 @@ public:
|
||||
, _base(base)
|
||||
, _schema(std::move(s))
|
||||
, _view_updates(std::move(views_to_update))
|
||||
, _updates(std::move(updates))
|
||||
, _existings(std::move(existings))
|
||||
, _update_reader(std::move(updates))
|
||||
, _existing_reader(std::move(existings))
|
||||
, _now(now) {
|
||||
}
|
||||
view_update_builder(view_update_builder&& other) noexcept = default;
|
||||
|
||||
// One builder instance processes at most one base partition, but it may
|
||||
// need multiple build_some() calls to emit all of that partition's view
|
||||
// updates.
|
||||
|
||||
// build_some() works on batches of 100 (max_rows_for_view_updates)
|
||||
// updated rows, but can_skip_view_updates() can decide that some of
|
||||
@@ -246,15 +249,22 @@ public:
|
||||
private:
|
||||
void generate_update(clustering_row&& update, std::optional<clustering_row>&& existing);
|
||||
void generate_update(static_row&& update, const tombstone& update_tomb, std::optional<static_row>&& existing, const tombstone& existing_tomb);
|
||||
future<stop_iteration> on_results();
|
||||
|
||||
future<stop_iteration> advance_all();
|
||||
future<stop_iteration> advance_updates();
|
||||
future<stop_iteration> advance_existings();
|
||||
// generate updates from the read fragments and read new fragments for the next iteration
|
||||
future<stop_iteration> generate_updates();
|
||||
|
||||
future<stop_iteration> read_both_next_fragments();
|
||||
future<stop_iteration> read_next_update_fragment();
|
||||
future<stop_iteration> read_next_existing_fragment();
|
||||
|
||||
void consume_both_fragments();
|
||||
void consume_update_fragment();
|
||||
void consume_existing_fragment();
|
||||
|
||||
future<stop_iteration> stop() const;
|
||||
};
|
||||
|
||||
// The readers provided for the view_update_builder should span the same single partition.
|
||||
view_update_builder make_view_update_builder(
|
||||
data_dictionary::database db,
|
||||
const replica::table& base_table,
|
||||
|
||||
@@ -375,7 +375,7 @@ static size_t memory_usage_of(const utils::chunked_vector<frozen_mutation_and_sc
|
||||
*
|
||||
* @param views the affected views which need to be updated.
|
||||
* @param base_token The token to use to match the base replica with the paired replicas.
|
||||
* @param reader the base table updates being applied, which all correspond to the base token.
|
||||
* @param reader the base table updates being applied, which all correspond to one partition corresponding to the base token.
|
||||
* @return a future that resolves when the updates have been acknowledged by the view replicas
|
||||
*/
|
||||
future<> view_update_generator::populate_views(const replica::table& table,
|
||||
|
||||
Reference in New Issue
Block a user