Merge 'Minimize generated view updates for unselected column updates' from Piotr
" This series addresses the issue of redundant view updates, generated for columns that were not selected for given materialized view. Cases covered (quote:) * If a base row has a live row marker, then we can avoid generating view updates if only unselected columns change; * If a base row has no live row marker, then we can avoid generating view updates if unselected columns are updated, unless they are newly created, deleted, or they have a TTL. Additionally, this series includes caching selected columns and is_index information to avoid unnecessary CPU cycles spent on recomputing these two. Fixes #3819 " * 'send_less_view_updates_if_not_necessary_4' of https://github.com/psarna/scylla: tests: add cases for view update generation optimizations view: minimize generated view updates for unselected columns view: cache is_index for view pointer index: make non-pointer overload of is_index function index: avoid copying when checking for is_index
This commit is contained in:
@@ -143,9 +143,10 @@ void view_info::initialize_base_dependent_fields(const schema& base) {
|
||||
}
|
||||
|
||||
bool view_info::is_index() const {
|
||||
//TODO(sarna): result of this call can be cached instead of calling index_manager::is_index every time
|
||||
column_family& base_cf = service::get_local_storage_service().db().local().find_column_family(base_id());
|
||||
return base_cf.get_index_manager().is_index(view_ptr(_schema.shared_from_this()));
|
||||
if (!_is_index) {
|
||||
_is_index = service::get_local_storage_service().db().local().find_column_family(base_id()).get_index_manager().is_index(_schema);
|
||||
}
|
||||
return *_is_index;
|
||||
}
|
||||
|
||||
namespace db {
|
||||
@@ -250,6 +251,7 @@ private:
|
||||
row_marker compute_row_marker(const clustering_row& base_row) const;
|
||||
dht::token token_for(const partition_key& base_key);
|
||||
deletable_row& get_view_row(const partition_key& base_key, const clustering_row& update);
|
||||
bool can_skip_view_updates(const clustering_row& update, const clustering_row& existing) const;
|
||||
void create_entry(const partition_key& base_key, const clustering_row& update, gc_clock::time_point now);
|
||||
void delete_old_entry(const partition_key& base_key, const clustering_row& existing, const clustering_row& update, gc_clock::time_point now);
|
||||
void do_delete_old_entry(const partition_key& base_key, const clustering_row& existing, const clustering_row& update, gc_clock::time_point now);
|
||||
@@ -520,6 +522,54 @@ void view_updates::do_delete_old_entry(const partition_key& base_key, const clus
|
||||
r.apply(update.tomb());
|
||||
}
|
||||
|
||||
bool view_updates::can_skip_view_updates(const clustering_row& update, const clustering_row& existing) const {
|
||||
const row& existing_row = existing.cells();
|
||||
const row& updated_row = update.cells();
|
||||
|
||||
const bool has_nonexpiring_marker = existing.marker().is_live() && !existing.marker().is_expiring();
|
||||
return boost::algorithm::all_of(_base->regular_columns(), [this, &updated_row, &existing_row, has_nonexpiring_marker] (const column_definition& cdef) {
|
||||
const auto it = _view->columns_by_name().find(cdef.name());
|
||||
const bool column_is_selected = it != _view->columns_by_name().end() && !it->second->is_view_virtual();
|
||||
|
||||
//TODO(sarna): Optimize collections case - currently they do not go under optimization
|
||||
if (!cdef.is_atomic()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// We cannot skip if the value was created or deleted, unless we have a non-expiring marker
|
||||
const auto* existing_cell = existing_row.find_cell(cdef.id);
|
||||
const auto* updated_cell = updated_row.find_cell(cdef.id);
|
||||
if (existing_cell == nullptr || updated_cell == nullptr) {
|
||||
return existing_cell == updated_cell || (!column_is_selected && has_nonexpiring_marker);
|
||||
}
|
||||
atomic_cell_view existing_cell_view = existing_cell->as_atomic_cell(cdef);
|
||||
atomic_cell_view updated_cell_view = updated_cell->as_atomic_cell(cdef);
|
||||
|
||||
// We cannot skip when a selected column is changed
|
||||
if (column_is_selected) {
|
||||
return compare_atomic_cell_for_merge(existing_cell_view, updated_cell_view) == 0;
|
||||
}
|
||||
|
||||
// With non-expiring row marker, liveness checks below are not relevant
|
||||
if (has_nonexpiring_marker) {
|
||||
return true;
|
||||
}
|
||||
|
||||
if (existing_cell_view.is_live() != updated_cell_view.is_live()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// We cannot skip if the change updates TTL
|
||||
const bool existing_has_ttl = existing_cell_view.is_live_and_has_ttl();
|
||||
const bool updated_has_ttl = updated_cell_view.is_live_and_has_ttl();
|
||||
if (existing_has_ttl || updated_has_ttl) {
|
||||
return existing_has_ttl == updated_has_ttl && existing_cell_view.expiry() == updated_cell_view.expiry();
|
||||
}
|
||||
|
||||
return true;
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates the updates to apply to the existing view entry given the base table row before
|
||||
* and after the update, assuming that the update hasn't changed to which view entry the
|
||||
@@ -540,6 +590,10 @@ void view_updates::update_entry(const partition_key& base_key, const clustering_
|
||||
return;
|
||||
}
|
||||
|
||||
if (can_skip_view_updates(update, existing)) {
|
||||
return;
|
||||
}
|
||||
|
||||
deletable_row& r = get_view_row(base_key, update);
|
||||
auto marker = compute_row_marker(update);
|
||||
r.apply(marker);
|
||||
|
||||
@@ -49,6 +49,7 @@
|
||||
#include "database.hh"
|
||||
|
||||
#include <boost/range/adaptor/map.hpp>
|
||||
#include <boost/algorithm/cxx11/any_of.hpp>
|
||||
|
||||
namespace secondary_index {
|
||||
|
||||
@@ -156,12 +157,13 @@ std::vector<index> secondary_index_manager::list_indexes() const {
|
||||
}
|
||||
|
||||
bool secondary_index_manager::is_index(view_ptr view) const {
|
||||
for (auto& i : list_indexes()) {
|
||||
if (view->cf_name() == index_table_name(i.metadata().name())) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
return is_index(*view);
|
||||
}
|
||||
|
||||
bool secondary_index_manager::is_index(const schema& s) const {
|
||||
return boost::algorithm::any_of(_indices | boost::adaptors::map_values, [&s] (const index& i) {
|
||||
return s.cf_name() == index_table_name(i.metadata().name());
|
||||
});
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -76,6 +76,7 @@ public:
|
||||
std::vector<index_metadata> get_dependent_indices(const column_definition& cdef) const;
|
||||
std::vector<index> list_indexes() const;
|
||||
bool is_index(view_ptr) const;
|
||||
bool is_index(const schema& s) const;
|
||||
private:
|
||||
void add_index(const index_metadata& im);
|
||||
};
|
||||
|
||||
@@ -460,3 +460,135 @@ SEASTAR_TEST_CASE(test_view_update_generator) {
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_generating_view_updates_optimizations) {
|
||||
return do_with_cql_env_thread([] (cql_test_env& e) {
|
||||
e.execute_cql("CREATE TABLE t1 (id int primary key, v1 int, v2 int, v3 int)").get();
|
||||
e.execute_cql("CREATE TABLE t2 (id int, c int, v1 int, v2 int, primary key(id, c));").get();
|
||||
e.execute_cql("CREATE TABLE t3 (a int, b int, c int, d int, e int, PRIMARY KEY (a, b));").get();
|
||||
|
||||
e.execute_cql("CREATE INDEX ON t1(v1);").get();
|
||||
e.execute_cql("CREATE INDEX ON t2(c);").get();
|
||||
e.execute_cql("CREATE MATERIALIZED VIEW mv AS SELECT a,b,c,e FROM t3 WHERE c IS NOT NULL AND a IS NOT NULL AND b IS NOT NULL PRIMARY KEY (c, a, b);").get();
|
||||
|
||||
schema_ptr t1 = e.local_db().find_schema("ks", "t1");
|
||||
schema_ptr t2 = e.local_db().find_schema("ks", "t2");
|
||||
schema_ptr t3 = e.local_db().find_schema("ks", "t3");
|
||||
schema_ptr t1_v1_idx_index = e.local_db().find_schema("ks", "t1_v1_idx_index");
|
||||
schema_ptr t2_c_idx_index = e.local_db().find_schema("ks", "t2_c_idx_index");
|
||||
schema_ptr mv = e.local_db().find_schema("ks", "mv");
|
||||
|
||||
auto generate_view_updates = [] (schema_ptr base, schema_ptr mv, mutation updated, mutation existing) {
|
||||
return db::view::generate_view_updates(base,
|
||||
std::vector<view_ptr>{view_ptr(mv)},
|
||||
flat_mutation_reader_from_mutations({std::move(updated)}),
|
||||
flat_mutation_reader_from_mutations({std::move(existing)})).get0();
|
||||
};
|
||||
|
||||
auto pk_t1 = dht::global_partitioner().decorate_key(*t1, partition_key::from_exploded(std::vector<bytes>{int32_type->decompose(1)}));
|
||||
|
||||
// Sub-case 1: updating not selected column without live row marker, no virtual columns
|
||||
mutation existing = mutation(t1, pk_t1);
|
||||
mutation updated = mutation(t1, pk_t1);
|
||||
|
||||
auto set_t1_cell = [](mutation& m, sstring col, int v) {
|
||||
m.set_cell(clustering_key_prefix::make_empty(), to_bytes(col), data_value(v), 0);
|
||||
};
|
||||
set_t1_cell(updated, "v1", 1);
|
||||
set_t1_cell(updated, "v2", 4);
|
||||
set_t1_cell(updated, "v3", 2);
|
||||
set_t1_cell(existing, "v1", 1);
|
||||
set_t1_cell(existing, "v2", 3);
|
||||
set_t1_cell(existing, "v3", 2);
|
||||
|
||||
auto view_updates = generate_view_updates(t1, t1_v1_idx_index, updated, existing);
|
||||
BOOST_REQUIRE(view_updates.empty());
|
||||
|
||||
// Sub-case 2: updating not selected column with live row marker, no virtual columns
|
||||
existing.partition().clustered_row(*t1, clustering_key_prefix::make_empty()).apply(row_marker(0));
|
||||
view_updates = generate_view_updates(t1, t1_v1_idx_index, updated, existing);
|
||||
BOOST_REQUIRE(view_updates.empty());
|
||||
|
||||
// Sub-case 3: updating not selected column without live row marker, the not selected one is virtual
|
||||
auto pk_t2 = dht::global_partitioner().decorate_key(*t2, partition_key::from_exploded(std::vector<bytes>{int32_type->decompose(1)}));
|
||||
existing = mutation(t2, pk_t2);
|
||||
updated = mutation(t2, pk_t2);
|
||||
|
||||
auto set_t2_cell = [](mutation& m, sstring col, int v) {
|
||||
m.set_cell(clustering_key_prefix::from_exploded(std::vector<bytes>{int32_type->decompose(2)}), to_bytes(col), data_value(v), 0);
|
||||
};
|
||||
|
||||
set_t2_cell(updated, "v1", 1);
|
||||
set_t2_cell(existing, "v1", 2);
|
||||
|
||||
view_updates = generate_view_updates(t2, t2_c_idx_index, updated, existing);
|
||||
BOOST_REQUIRE(view_updates.empty());
|
||||
|
||||
// Sub-case 4: updating not selected column with live row marker, the not selected one is virtual
|
||||
existing.partition().clustered_row(*t2, clustering_key_prefix::from_exploded(std::vector<bytes>{int32_type->decompose(2)})).apply(row_marker(0));
|
||||
view_updates = generate_view_updates(t2, t2_c_idx_index, updated, existing);
|
||||
BOOST_REQUIRE(view_updates.empty());
|
||||
|
||||
// Sub-case with updates 1: updating a column with TTL
|
||||
auto pk_t3 = dht::global_partitioner().decorate_key(*t3, partition_key::from_exploded(std::vector<bytes>{int32_type->decompose(1)}));
|
||||
existing = mutation(t3, pk_t3);
|
||||
updated = mutation(t3, pk_t3);
|
||||
|
||||
auto set_t3_cell = [](mutation& m, sstring col, int v, ttl_opt ttl) {
|
||||
m.set_cell(clustering_key_prefix::from_exploded(std::vector<bytes>{int32_type->decompose(1)}), to_bytes(col), data_value(v), 0, ttl);
|
||||
};
|
||||
|
||||
set_t3_cell(existing, "c", 1, ttl_opt{});
|
||||
set_t3_cell(existing, "d", 1, std::chrono::hours(99999));
|
||||
set_t3_cell(updated, "c", 1, ttl_opt{});
|
||||
set_t3_cell(updated, "d", 2, ttl_opt{});
|
||||
|
||||
view_updates = generate_view_updates(t3, mv, updated, existing);
|
||||
BOOST_REQUIRE_EQUAL(view_updates.size(), 1);
|
||||
|
||||
// Sub-case with updates 2: updating column with the new one having TTL
|
||||
existing = mutation(t3, pk_t3);
|
||||
updated = mutation(t3, pk_t3);
|
||||
|
||||
set_t3_cell(existing, "c", 1, ttl_opt{});
|
||||
set_t3_cell(existing, "d", 1, ttl_opt{});
|
||||
set_t3_cell(updated, "c", 1, ttl_opt{});
|
||||
set_t3_cell(updated, "d", 2, std::chrono::hours(99999));
|
||||
|
||||
view_updates = generate_view_updates(t3, mv, updated, existing);
|
||||
BOOST_REQUIRE_EQUAL(view_updates.size(), 1);
|
||||
|
||||
// Sub-case with updates 3: updating column with both having different TTLs
|
||||
existing = mutation(t3, pk_t3);
|
||||
updated = mutation(t3, pk_t3);
|
||||
|
||||
set_t3_cell(existing, "c", 1, ttl_opt{});
|
||||
set_t3_cell(existing, "d", 1, std::chrono::hours(99999));
|
||||
set_t3_cell(updated, "c", 1, ttl_opt{});
|
||||
set_t3_cell(updated, "d", 2, std::chrono::hours(99971));
|
||||
|
||||
view_updates = generate_view_updates(t3, mv, updated, existing);
|
||||
BOOST_REQUIRE_EQUAL(view_updates.size(), 1);
|
||||
|
||||
// Sub-case with updates 4: column is selected and is mv key
|
||||
existing = mutation(t3, pk_t3);
|
||||
updated = mutation(t3, pk_t3);
|
||||
|
||||
set_t3_cell(existing, "c", 3, ttl_opt{});
|
||||
set_t3_cell(updated, "c", 5, ttl_opt{});
|
||||
|
||||
view_updates = generate_view_updates(t3, mv, updated, existing);
|
||||
BOOST_REQUIRE_EQUAL(view_updates.size(), 2); // one deletion, one insertion
|
||||
|
||||
// Sub-case with updates 5: column is selected as regular
|
||||
existing = mutation(t3, pk_t3);
|
||||
updated = mutation(t3, pk_t3);
|
||||
set_t3_cell(existing, "c", 1, ttl_opt{});
|
||||
set_t3_cell(existing, "e", 1, ttl_opt{});
|
||||
set_t3_cell(updated, "c", 1, ttl_opt{});
|
||||
set_t3_cell(updated, "e", 2, ttl_opt{});
|
||||
|
||||
view_updates = generate_view_updates(t3, mv, updated, existing);
|
||||
BOOST_REQUIRE_EQUAL(view_updates.size(), 1);
|
||||
});
|
||||
}
|
||||
|
||||
@@ -35,6 +35,7 @@ class view_info final {
|
||||
mutable std::optional<dht::partition_range_vector> _partition_ranges;
|
||||
// Id of a regular base table column included in the view's PK, if any.
|
||||
mutable std::optional<column_id> _base_non_pk_column_in_view_pk;
|
||||
mutable std::optional<bool> _is_index;
|
||||
public:
|
||||
view_info(const schema& schema, const raw_view_info& raw_view_info);
|
||||
|
||||
|
||||
Reference in New Issue
Block a user