diff --git a/db/view/view.cc b/db/view/view.cc index 9af9b9c432..69c16854c9 100644 --- a/db/view/view.cc +++ b/db/view/view.cc @@ -1346,53 +1346,132 @@ bool view_updates::generate_partition_tombstone_update( } future<> view_update_builder::close() noexcept { - return when_all_succeed(_updates.close(), _existings->close()).discard_result(); + return when_all_succeed(_update_reader.close(), _existing_reader->close()).discard_result(); } -future view_update_builder::advance_all() { - auto existings_f = _existings ? (*_existings)() : make_ready_future(); - return when_all(_updates(), std::move(existings_f)).then([this] (auto&& fragments) mutable { - _update = std::move(std::get<0>(fragments).get()); - _existing = std::move(std::get<1>(fragments).get()); +future view_update_builder::read_both_next_fragments() { + auto existings_f = _existing_reader ? (*_existing_reader)() : make_ready_future(); + return when_all(_update_reader(), std::move(existings_f)).then([this] (auto&& fragments) mutable { + _update_fragment = std::move(std::get<0>(fragments).get()); + _existing_fragment = std::move(std::get<1>(fragments).get()); return stop_iteration::no; }); } -future view_update_builder::advance_updates() { - return _updates().then([this] (auto&& update) mutable { - _update = std::move(update); +future view_update_builder::read_next_update_fragment() { + return _update_reader().then([this] (auto&& update) mutable { + _update_fragment = std::move(update); return stop_iteration::no; }); } -future view_update_builder::advance_existings() { - if (!_existings) { +future view_update_builder::read_next_existing_fragment() { + if (!_existing_reader) { return make_ready_future(stop_iteration::no); } - return (*_existings)().then([this] (auto&& existing) mutable { - _existing = std::move(existing); + return (*_existing_reader)().then([this] (auto&& existing) mutable { + _existing_fragment = std::move(existing); return stop_iteration::no; }); } +void view_update_builder::consume_both_fragments() { + if (_update_fragment->is_range_tombstone_change()) { + SCYLLA_ASSERT(_existing_fragment->is_range_tombstone_change()); + _existing_current_tombstone = std::move(*_existing_fragment).as_range_tombstone_change().tombstone(); + _update_current_tombstone = std::move(*_update_fragment).as_range_tombstone_change().tombstone(); + } else if (_update_fragment->is_clustering_row()) { + SCYLLA_ASSERT(_existing_fragment->is_clustering_row()); + auto update = std::move(*_update_fragment).as_clustering_row(); + update.apply(std::max(_update_partition_tombstone, _update_current_tombstone)); + auto existing = std::move(*_existing_fragment).as_clustering_row(); + existing.apply(std::max(_existing_partition_tombstone, _existing_current_tombstone)); + generate_update(std::move(update), { std::move(existing) }); + } else if (_update_fragment->is_static_row()) { + if (!_existing_fragment->is_static_row()) { + on_internal_error(vlogger, format("Static row update mutation part {} shouldn't compare equal with an existing, non-static row mutation part {}", + mutation_fragment_v2::printer(*_schema, *_update_fragment), mutation_fragment_v2::printer(*_schema, *_existing_fragment))); + } + generate_update(std::move(*_update_fragment).as_static_row(), _update_partition_tombstone, { std::move(*_existing_fragment).as_static_row() }, _existing_partition_tombstone); + + } +} + +void view_update_builder::consume_update_fragment() { + // We have an update where there was nothing before + if (_update_fragment->is_range_tombstone_change()) { + _update_current_tombstone = _update_fragment->as_range_tombstone_change().tombstone(); + } else if (_update_fragment->is_clustering_row()) { + auto update = std::move(*_update_fragment).as_clustering_row(); + update.apply(std::max(_update_partition_tombstone, _update_current_tombstone)); + auto tombstone = std::max(_existing_partition_tombstone, _existing_current_tombstone); + auto existing = tombstone + ? std::optional(std::in_place, update.key(), row_tombstone(std::move(tombstone)), row_marker(), ::row()) + : std::nullopt; + generate_update(std::move(update), std::move(existing)); + } else if (_update_fragment->is_static_row()) { + auto update = std::move(*_update_fragment).as_static_row(); + auto tombstone = _existing_partition_tombstone; + auto existing = tombstone + ? std::optional(std::in_place) + : std::nullopt; + generate_update(std::move(update), _update_partition_tombstone, std::move(existing), _existing_partition_tombstone); + } +} + +void view_update_builder::consume_existing_fragment() { + // We have something existing but no update (which will happen either because it's a range tombstone marker in + // existing, or because we've fetched the existing row due to some partition/range deletion in the updates). + // Due to how the read command for existing rows is constructed, it is also possible that there is a static + // row is included, even though we didn't modify it. + if (_existing_fragment->is_range_tombstone_change()) { + _existing_current_tombstone = _existing_fragment->as_range_tombstone_change().tombstone(); + } else if (_existing_fragment->is_clustering_row()) { + auto existing = std::move(*_existing_fragment).as_clustering_row(); + existing.apply(std::max(_existing_partition_tombstone, _existing_current_tombstone)); + auto tombstone = std::max(_update_partition_tombstone, _update_current_tombstone); + // The way we build the read command used for existing rows, we should always have a non-empty + // tombstone, since we wouldn't have read the existing row otherwise. We don't SCYLLA_ASSERT that in case the + // read method ever changes. + if (tombstone) { + auto update = clustering_row(existing.key(), row_tombstone(std::move(tombstone)), row_marker(), ::row()); + generate_update(std::move(update), { std::move(existing) }); + } + } else if (_existing_fragment->is_static_row()) { + auto existing = std::move(*_existing_fragment).as_static_row(); + auto tombstone = _update_partition_tombstone; + // The static row might be unintentionally included when fetching existing clustering rows, + // even if the static row was not updated. We can detect it. A static row can be affected either by: + // + // 1. A static row in the update mutation + // 2. A partition tombstone in the update mutation + // + // If neither of those is present, this means that the static row is included accidentally. + // If we are here, this means that (1) is not present. The `if` that follows checks for (2). + if (tombstone) { + auto update = static_row(); + generate_update(std::move(update), _update_partition_tombstone, { std::move(existing) }, _existing_partition_tombstone); + } + } +} + future view_update_builder::stop() const { return make_ready_future(stop_iteration::yes); } future>> view_update_builder::build_some() { - (void)co_await advance_all(); - if (!_update && !_existing) { + (void)co_await read_both_next_fragments(); + if (!_update_fragment && !_existing_fragment) { // Tell the caller there is no more data to build. co_return std::nullopt; } bool do_advance_updates = false; bool do_advance_existings = false; bool is_partition_tombstone_applied_on_all_views = false; - if (_update && _update->is_partition_start()) { - _key = std::move(std::move(_update)->as_partition_start().key().key()); - _update_partition_tombstone = _update->as_partition_start().partition_tombstone(); + if (_update_fragment && _update_fragment->is_partition_start()) { + _key = _update_fragment->as_partition_start().key().key(); + _update_partition_tombstone = _update_fragment->as_partition_start().partition_tombstone(); do_advance_updates = true; - if (_update_partition_tombstone) { // For views that have the same partition key as base, generate an update of partition tombstone to delete // the entire partition in one operation, instead of generating an update for each row. @@ -1403,26 +1482,26 @@ future>> view_up } } } - if (_existing && _existing->is_partition_start()) { - _existing_partition_tombstone = _existing->as_partition_start().partition_tombstone(); + if (_existing_fragment && _existing_fragment->is_partition_start()) { + _existing_partition_tombstone = _existing_fragment->as_partition_start().partition_tombstone(); do_advance_existings = true; } if (do_advance_updates) { - co_await (do_advance_existings ? advance_all() : advance_updates()); + co_await (do_advance_existings ? read_both_next_fragments() : read_next_update_fragment()); } else if (do_advance_existings) { - co_await advance_existings(); + co_await read_next_existing_fragment(); } - if (utils::get_local_injector().enter("keep_mv_read_semaphore_units_10ms_longer") && _existing && _existing->is_clustering_row()) { + if (utils::get_local_injector().enter("keep_mv_read_semaphore_units_10ms_longer") && _existing_fragment && _existing_fragment->is_clustering_row()) { co_await seastar::sleep(std::chrono::milliseconds(10)); } // If the partition tombstone update is applied to all the views and there are no other updates, we can skip going over // all the rows trying to generate row updates, because the partition tombstones already cover everything. - if (is_partition_tombstone_applied_on_all_views && _update->is_end_of_partition()) { + if (is_partition_tombstone_applied_on_all_views && _update_fragment->is_end_of_partition()) { _skip_row_updates = true; } - while (!_skip_row_updates && co_await on_results() == stop_iteration::no) {}; + while (!_skip_row_updates && co_await generate_updates() == stop_iteration::no) {}; utils::chunked_vector mutations; for (auto& update : _view_updates) { @@ -1490,7 +1569,7 @@ void view_update_builder::generate_update(static_row&& update, const tombstone& } } -future view_update_builder::on_results() { +future view_update_builder::generate_updates() { constexpr size_t max_rows_for_view_updates = 100; auto should_stop_updates = [this] () -> bool { size_t rows_for_view_updates = std::accumulate(_view_updates.begin(), _view_updates.end(), 0, [] (size_t acc, const view_updates& vu) { @@ -1498,128 +1577,29 @@ future view_update_builder::on_results() { }); return rows_for_view_updates >= max_rows_for_view_updates; }; - if (_update && !_update->is_end_of_partition() && _existing && !_existing->is_end_of_partition()) { - auto cmp = position_in_partition::tri_compare(*_schema)(_update->position(), _existing->position()); + if (_update_fragment && !_update_fragment->is_end_of_partition() && _existing_fragment && !_existing_fragment->is_end_of_partition()) { + auto cmp = position_in_partition::tri_compare(*_schema)(_update_fragment->position(), _existing_fragment->position()); if (cmp < 0) { - // We have an update where there was nothing before - if (_update->is_range_tombstone_change()) { - _update_current_tombstone = _update->as_range_tombstone_change().tombstone(); - } else if (_update->is_clustering_row()) { - auto update = std::move(*_update).as_clustering_row(); - update.apply(std::max(_update_partition_tombstone, _update_current_tombstone)); - auto tombstone = std::max(_existing_partition_tombstone, _existing_current_tombstone); - auto existing = tombstone - ? std::optional(std::in_place, update.key(), row_tombstone(std::move(tombstone)), row_marker(), ::row()) - : std::nullopt; - generate_update(std::move(update), std::move(existing)); - } else if (_update->is_static_row()) { - auto update = std::move(*_update).as_static_row(); - auto tombstone = _existing_partition_tombstone; - auto existing = tombstone - ? std::optional(std::in_place) - : std::nullopt; - generate_update(std::move(update), _update_partition_tombstone, std::move(existing), _existing_partition_tombstone); - } - return should_stop_updates() ? stop() : advance_updates(); + consume_update_fragment(); + return should_stop_updates() ? stop() : read_next_update_fragment(); } if (cmp > 0) { - // We have something existing but no update (which will happen either because it's a range tombstone marker in - // existing, or because we've fetched the existing row due to some partition/range deletion in the updates). - // Due to how the read command for existing rows is constructed, it is also possible that there is a static - // row is included, even though we didn't modify it. - if (_existing->is_range_tombstone_change()) { - _existing_current_tombstone = _existing->as_range_tombstone_change().tombstone(); - } else if (_existing->is_clustering_row()) { - auto existing = std::move(*_existing).as_clustering_row(); - existing.apply(std::max(_existing_partition_tombstone, _existing_current_tombstone)); - auto tombstone = std::max(_update_partition_tombstone, _update_current_tombstone); - // The way we build the read command used for existing rows, we should always have a non-empty - // tombstone, since we wouldn't have read the existing row otherwise. We don't SCYLLA_ASSERT that in case the - // read method ever changes. - if (tombstone) { - auto update = clustering_row(existing.key(), row_tombstone(std::move(tombstone)), row_marker(), ::row()); - generate_update(std::move(update), { std::move(existing) }); - } - } else if (_existing->is_static_row()) { - auto existing = std::move(*_existing).as_static_row(); - auto tombstone = _update_partition_tombstone; - // The static row might be unintentionally included when fetching existing clustering rows, - // even if the static row was not updated. We can detect it. A static row can be affected either by: - // - // 1. A static row in the update mutation - // 2. A partition tombstone in the update mutation - // - // If neither of those is present, this means that the static row is included accidentally. - // If we are here, this means that (1) is not present. The `if` that follows checks for (2). - if (tombstone) { - auto update = static_row(); - generate_update(std::move(update), _update_partition_tombstone, { std::move(existing) }, _existing_partition_tombstone); - } - } - return should_stop_updates() ? stop () : advance_existings(); + consume_existing_fragment(); + return should_stop_updates() ? stop () : read_next_existing_fragment(); } // We're updating a row that had pre-existing data - if (_update->is_range_tombstone_change()) { - SCYLLA_ASSERT(_existing->is_range_tombstone_change()); - _existing_current_tombstone = std::move(*_existing).as_range_tombstone_change().tombstone(); - _update_current_tombstone = std::move(*_update).as_range_tombstone_change().tombstone(); - } else if (_update->is_clustering_row()) { - SCYLLA_ASSERT(_existing->is_clustering_row()); - _update->mutate_as_clustering_row(*_schema, [&] (clustering_row& cr) mutable { - cr.apply(std::max(_update_partition_tombstone, _update_current_tombstone)); - }); - _existing->mutate_as_clustering_row(*_schema, [&] (clustering_row& cr) mutable { - cr.apply(std::max(_existing_partition_tombstone, _existing_current_tombstone)); - }); - generate_update(std::move(*_update).as_clustering_row(), { std::move(*_existing).as_clustering_row() }); - } else if (_update->is_static_row()) { - if (!_existing->is_static_row()) { - on_internal_error(vlogger, format("Static row update mutation part {} shouldn't compare equal with an existing, non-static row mutation part {}", - mutation_fragment_v2::printer(*_schema, *_update), mutation_fragment_v2::printer(*_schema, *_existing))); - } - generate_update(std::move(*_update).as_static_row(), _update_partition_tombstone, { std::move(*_existing).as_static_row() }, _existing_partition_tombstone); - - } - return should_stop_updates() ? stop() : advance_all(); + consume_both_fragments(); + return should_stop_updates() ? stop() : read_both_next_fragments(); } - auto tombstone = std::max(_update_partition_tombstone, _update_current_tombstone); - if (tombstone && _existing && !_existing->is_end_of_partition()) { - if (_existing->is_range_tombstone_change()) { - _existing_current_tombstone = _existing->as_range_tombstone_change().tombstone(); - } else if (_existing->is_clustering_row()) { - auto existing = clustering_row(*_schema, _existing->as_clustering_row()); - existing.apply(std::max(_existing_partition_tombstone, _existing_current_tombstone)); - auto update = clustering_row(existing.key(), row_tombstone(std::move(tombstone)), row_marker(), ::row()); - generate_update(std::move(update), { std::move(existing) }); - } else if (_existing->is_static_row()) { - auto existing = static_row(*_schema, _existing->as_static_row()); - auto update = static_row(); - generate_update(std::move(update), _update_partition_tombstone, { std::move(existing) }, _existing_partition_tombstone); - } - return should_stop_updates() ? stop() : advance_existings(); + if (_existing_fragment && !_existing_fragment->is_end_of_partition()) { + consume_existing_fragment(); + return should_stop_updates() ? stop() : read_next_existing_fragment(); } - if (_update && !_update->is_end_of_partition()) { - if (_update->is_range_tombstone_change()) { - _update_current_tombstone = _update->as_range_tombstone_change().tombstone(); - } else if (_update->is_clustering_row()) { - _update->mutate_as_clustering_row(*_schema, [&] (clustering_row& cr) mutable { - cr.apply(std::max(_update_partition_tombstone, _update_current_tombstone)); - }); - auto existing_tombstone = std::max(_existing_partition_tombstone, _existing_current_tombstone); - auto existing = existing_tombstone - ? std::optional(std::in_place, _update->as_clustering_row().key(), row_tombstone(std::move(existing_tombstone)), row_marker(), ::row()) - : std::nullopt; - generate_update(std::move(*_update).as_clustering_row(), std::move(existing)); - } else if (_update->is_static_row()) { - auto existing_tombstone = _existing_partition_tombstone; - auto existing = existing_tombstone - ? std::optional(std::in_place) - : std::nullopt; - generate_update(std::move(*_update).as_static_row(), _update_partition_tombstone, std::move(existing), _existing_partition_tombstone); - } - return should_stop_updates() ? stop() : advance_updates(); + if (_update_fragment && !_update_fragment->is_end_of_partition()) { + consume_update_fragment(); + return should_stop_updates() ? stop() : read_next_update_fragment(); } return stop(); diff --git a/db/view/view.hh b/db/view/view.hh index fe797a4022..5c32f4bd15 100644 --- a/db/view/view.hh +++ b/db/view/view.hh @@ -203,14 +203,14 @@ class view_update_builder { const replica::table& _base; schema_ptr _schema; // The base schema std::vector _view_updates; - mutation_reader _updates; - mutation_reader_opt _existings; + mutation_reader _update_reader; + mutation_reader_opt _existing_reader; tombstone _update_partition_tombstone; tombstone _update_current_tombstone; tombstone _existing_partition_tombstone; tombstone _existing_current_tombstone; - mutation_fragment_v2_opt _update; - mutation_fragment_v2_opt _existing; + mutation_fragment_v2_opt _update_fragment; + mutation_fragment_v2_opt _existing_fragment; gc_clock::time_point _now; partition_key _key = partition_key::make_empty(); bool _skip_row_updates = false; @@ -225,12 +225,15 @@ public: , _base(base) , _schema(std::move(s)) , _view_updates(std::move(views_to_update)) - , _updates(std::move(updates)) - , _existings(std::move(existings)) + , _update_reader(std::move(updates)) + , _existing_reader(std::move(existings)) , _now(now) { } view_update_builder(view_update_builder&& other) noexcept = default; + // One builder instance processes at most one base partition, but it may + // need multiple build_some() calls to emit all of that partition's view + // updates. // build_some() works on batches of 100 (max_rows_for_view_updates) // updated rows, but can_skip_view_updates() can decide that some of @@ -246,15 +249,22 @@ public: private: void generate_update(clustering_row&& update, std::optional&& existing); void generate_update(static_row&& update, const tombstone& update_tomb, std::optional&& existing, const tombstone& existing_tomb); - future on_results(); - future advance_all(); - future advance_updates(); - future advance_existings(); + // generate updates from the read fragments and read new fragments for the next iteration + future generate_updates(); + + future read_both_next_fragments(); + future read_next_update_fragment(); + future read_next_existing_fragment(); + + void consume_both_fragments(); + void consume_update_fragment(); + void consume_existing_fragment(); future stop() const; }; +// The readers provided for the view_update_builder should span the same single partition. view_update_builder make_view_update_builder( data_dictionary::database db, const replica::table& base_table, diff --git a/db/view/view_update_generator.cc b/db/view/view_update_generator.cc index aa054bc957..266ea7e4b8 100644 --- a/db/view/view_update_generator.cc +++ b/db/view/view_update_generator.cc @@ -375,7 +375,7 @@ static size_t memory_usage_of(const utils::chunked_vector view_update_generator::populate_views(const replica::table& table,