From 38831888d2d07a85415597e1aeae5143023daed9 Mon Sep 17 00:00:00 2001 From: Duarte Nunes Date: Thu, 22 Feb 2018 11:59:13 +0000 Subject: [PATCH 01/22] db/system_keyspace: Include MV system tables in all_tables() Signed-off-by: Duarte Nunes --- db/system_keyspace.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/db/system_keyspace.cc b/db/system_keyspace.cc index fdc6086109..e722b0f98c 100644 --- a/db/system_keyspace.cc +++ b/db/system_keyspace.cc @@ -1541,7 +1541,7 @@ std::vector all_tables() { r.insert(r.end(), { built_indexes(), hints(), batchlog(), paxos(), local(), peers(), peer_events(), range_xfers(), compactions_in_progress(), compaction_history(), - sstable_activity(), size_estimates(), + sstable_activity(), size_estimates(), v3::views_builds_in_progress(), v3::built_views(), }); // legacy schema r.insert(r.end(), { From 7811474697521531f544b05dbde6649089034da7 Mon Sep 17 00:00:00 2001 From: Duarte Nunes Date: Thu, 22 Feb 2018 11:59:56 +0000 Subject: [PATCH 02/22] db/system_keyspace: Add Scylla-specific MV system table When building a materialized view, we divide our work by shard, so we need to register which shard did what work in the in-progress system table. We also add the token we started at, which will enable some optimizations in the view building code. Signed-off-by: Duarte Nunes --- db/system_keyspace.cc | 17 +++++++++++++++++ db/system_keyspace.hh | 1 + 2 files changed, 18 insertions(+) diff --git a/db/system_keyspace.cc b/db/system_keyspace.cc index e722b0f98c..24eedf089a 100644 --- a/db/system_keyspace.cc +++ b/db/system_keyspace.cc @@ -642,6 +642,22 @@ schema_ptr built_views() { return schema; } +schema_ptr scylla_views_builds_in_progress() { + static thread_local auto schema = [] { + auto id = generate_legacy_id(NAME, SCYLLA_VIEWS_BUILDS_IN_PROGRESS); + return schema_builder(NAME, SCYLLA_VIEWS_BUILDS_IN_PROGRESS, stdx::make_optional(id)) + .with_column("keyspace_name", utf8_type, column_kind::partition_key) + .with_column("view_name", utf8_type, column_kind::clustering_key) + .with_column("cpu_id", int32_type, column_kind::clustering_key) + .with_column("next_token", utf8_type) + .with_column("generation_number", int32_type) + .with_column("first_token", utf8_type) + .with_version(generate_schema_version(id)) + .build(); + }(); + return schema; +} + } // namespace legacy { @@ -1542,6 +1558,7 @@ std::vector all_tables() { peers(), peer_events(), range_xfers(), compactions_in_progress(), compaction_history(), sstable_activity(), size_estimates(), v3::views_builds_in_progress(), v3::built_views(), + v3::scylla_views_builds_in_progress(), }); // legacy schema r.insert(r.end(), { diff --git a/db/system_keyspace.hh b/db/system_keyspace.hh index 1ad8dac1a3..d30cb50002 100644 --- a/db/system_keyspace.hh +++ b/db/system_keyspace.hh @@ -99,6 +99,7 @@ static constexpr auto SIZE_ESTIMATES = "size_estimates"; static constexpr auto AVAILABLE_RANGES = "available_ranges"; static constexpr auto VIEWS_BUILDS_IN_PROGRESS = "views_builds_in_progress"; static constexpr auto BUILT_VIEWS = "built_views"; +static constexpr auto SCYLLA_VIEWS_BUILDS_IN_PROGRESS = "scylla_views_builds_in_progress"; } namespace legacy { From b2cae7ea09f84f67a80d7062c9d204afe7faf0b0 Mon Sep 17 00:00:00 2001 From: Duarte Nunes Date: Thu, 22 Feb 2018 12:04:06 +0000 Subject: [PATCH 03/22] db/system_keyspace: Add virtual reader for MV in-progress build status Provide a virtual reader so users can query the in-progress view table in a way compatible with Apache Cassandra. Signed-off-by: Duarte Nunes --- db/system_keyspace.cc | 4 + db/view/build_progress_virtual_reader.hh | 195 +++++++++++++++++++++++ 2 files changed, 199 insertions(+) create mode 100644 db/view/build_progress_virtual_reader.hh diff --git a/db/system_keyspace.cc b/db/system_keyspace.cc index 24eedf089a..906b5fc1d6 100644 --- a/db/system_keyspace.cc +++ b/db/system_keyspace.cc @@ -74,6 +74,7 @@ #include "db/size_estimates_virtual_reader.hh" #include "db/timeout_clock.hh" #include "sstables/sstables.hh" +#include "db/view/build_progress_virtual_reader.hh" #include "db/schema_tables.hh" using days = std::chrono::duration>; @@ -1575,6 +1576,9 @@ static void maybe_add_virtual_reader(schema_ptr s, database& db) { if (s.get() == size_estimates().get()) { db.find_column_family(s).set_virtual_reader(mutation_source(db::size_estimates::virtual_reader())); } + if (s.get() == v3::views_builds_in_progress().get()) { + db.find_column_family(s).set_virtual_reader(mutation_source(db::view::build_progress_virtual_reader(db))); + } } static bool maybe_write_in_user_memory(schema_ptr s, database& db) { diff --git a/db/view/build_progress_virtual_reader.hh b/db/view/build_progress_virtual_reader.hh new file mode 100644 index 0000000000..28e267ad9c --- /dev/null +++ b/db/view/build_progress_virtual_reader.hh @@ -0,0 +1,195 @@ +/* + * Copyright (C) 2018 ScyllaDB + */ + +/* + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Scylla. If not, see . + */ + +#include "database.hh" +#include "db/system_keyspace.hh" +#include "db/timeout_clock.hh" +#include "dht/i_partitioner.hh" +#include "flat_mutation_reader.hh" +#include "mutation_fragment.hh" +#include "mutation_reader.hh" +#include "query-request.hh" +#include "schema.hh" +#include "tracing/tracing.hh" + +#include + +#include +#include + +namespace db::view { + +// Allows a user to query the views_builds_in_progress system table +// in terms of the scylla_views_builds_in_progress one, which is +// a superset of the former. When querying, we don't have to adjust +// the clustering key, but we have to adjust the requested regular +// columns. When reading the results from the scylla_views_builds_in_progress +// table, we adjust the clustering key (we shed the cpu_id column) and map +// back the regular columns. +class build_progress_virtual_reader { + database& _db; + + struct build_progress_reader : flat_mutation_reader::impl { + column_id _scylla_next_token_col; + column_id _scylla_generation_number_col; + column_id _legacy_last_token_col; + column_id _legacy_generation_number_col; + const query::partition_slice& _legacy_slice; + query::partition_slice _slice; + flat_mutation_reader _underlying; + + build_progress_reader( + schema_ptr legacy_schema, + column_family& scylla_views_build_progress, + const dht::partition_range& range, + const query::partition_slice& slice, + const io_priority_class& pc, + tracing::trace_state_ptr trace_state, + streamed_mutation::forwarding fwd, + mutation_reader::forwarding fwd_mr) + : flat_mutation_reader::impl(std::move(legacy_schema)) + , _scylla_next_token_col(scylla_views_build_progress.schema()->get_column_definition("next_token")->id) + , _scylla_generation_number_col(scylla_views_build_progress.schema()->get_column_definition("generation_number")->id) + , _legacy_last_token_col(_schema->get_column_definition("last_token")->id) + , _legacy_generation_number_col(_schema->get_column_definition("generation_number")->id) + , _legacy_slice(slice) + , _slice(adjust_partition_slice()) + , _underlying(scylla_views_build_progress.make_reader( + scylla_views_build_progress.schema(), + range, + slice, + pc, + std::move(trace_state), + fwd, + fwd_mr)) { + } + + const schema& underlying_schema() const { + return *_underlying.schema(); + } + + query::partition_slice adjust_partition_slice() { + auto slice = _legacy_slice; + std::vector adjusted_columns; + for (auto col_id : slice.regular_columns) { + if (col_id == _legacy_last_token_col) { + adjusted_columns.push_back(_scylla_next_token_col); + } else if (col_id == _legacy_generation_number_col) { + adjusted_columns.push_back(_scylla_generation_number_col); + } + } + slice.regular_columns = std::move(adjusted_columns); + return slice; + } + + clustering_key adjust_ckey(clustering_key& ck) { + if (ck.size(underlying_schema()) < 3) { + return std::move(ck); + } + // Drop the cpu_id from the clustering key + auto end = ck.begin(*_schema); + std::advance(end, 1); + auto r = boost::make_iterator_range(ck.begin(*_schema), std::move(end)); + return clustering_key_prefix::from_exploded(r); + } + + virtual future<> fill_buffer(db::timeout_clock::time_point timeout) override { + return _underlying.fill_buffer(timeout).then([this] { + _end_of_stream = _underlying.is_end_of_stream(); + while (!_underlying.is_buffer_empty()) { + auto mf = _underlying.pop_mutation_fragment(); + if (mf.is_clustering_row()) { + auto scylla_in_progress_row = std::move(mf).as_clustering_row(); + auto legacy_in_progress_row = row(); + // Drop the first_token from the regular columns + scylla_in_progress_row.cells().for_each_cell([&, this] (column_id id, atomic_cell_or_collection& c) { + if (id == _scylla_next_token_col) { + legacy_in_progress_row.append_cell(_legacy_last_token_col, std::move(c)); + } else if (id == _scylla_generation_number_col) { + legacy_in_progress_row.append_cell(_legacy_generation_number_col, std::move(c)); + } + }); + mf = clustering_row( + adjust_ckey(scylla_in_progress_row.key()), + std::move(scylla_in_progress_row.tomb()), + std::move(scylla_in_progress_row.marker()), + std::move(legacy_in_progress_row)); + } else if (mf.is_range_tombstone()) { + auto scylla_in_progress_rt = std::move(mf).as_range_tombstone(); + mf = range_tombstone( + adjust_ckey(scylla_in_progress_rt.start), + scylla_in_progress_rt.start_kind, + scylla_in_progress_rt.end, + scylla_in_progress_rt.end_kind, + scylla_in_progress_rt.tomb); + } + push_mutation_fragment(mf); + } + }); + } + + virtual void next_partition() override { + _end_of_stream = false; + clear_buffer_to_next_partition(); + if (is_buffer_empty()) { + _underlying.next_partition(); + } + } + + virtual future<> fast_forward_to(const dht::partition_range& pr, db::timeout_clock::time_point timeout) override { + clear_buffer(); + _end_of_stream = false; + return _underlying.fast_forward_to(pr, timeout); + } + + virtual future<> fast_forward_to(position_range range, db::timeout_clock::time_point timeout) override { + forward_buffer_to(range.start()); + _end_of_stream = false; + return _underlying.fast_forward_to(std::move(range), timeout); + } + }; + +public: + build_progress_virtual_reader(database& db) + : _db(db) { + } + + flat_mutation_reader operator()( + schema_ptr s, + const dht::partition_range& range, + const query::partition_slice& slice, + const io_priority_class& pc, + tracing::trace_state_ptr trace_state, + streamed_mutation::forwarding fwd, + mutation_reader::forwarding fwd_mr) { + return flat_mutation_reader(std::make_unique( + std::move(s), + _db.find_column_family(s->ks_name(), system_keyspace::v3::SCYLLA_VIEWS_BUILDS_IN_PROGRESS), + range, + slice, + pc, + std::move(trace_state), + fwd, + fwd_mr)); + } +}; + +} \ No newline at end of file From 4227641a3dc4548a1140815cdc1cc24d4be09678 Mon Sep 17 00:00:00 2001 From: Duarte Nunes Date: Thu, 22 Feb 2018 17:27:22 +0000 Subject: [PATCH 04/22] db/system_keyspace: Add API for MV-related system tables This patch implements an API to access the MV-related system tables, which pertain to the view building process. Signed-off-by: Duarte Nunes --- db/system_keyspace.cc | 79 +++++++++++++++++++++++++++++++++++++++++++ db/system_keyspace.hh | 18 ++++++++++ 2 files changed, 97 insertions(+) diff --git a/db/system_keyspace.cc b/db/system_keyspace.cc index 906b5fc1d6..826fb7f569 100644 --- a/db/system_keyspace.cc +++ b/db/system_keyspace.cc @@ -1804,6 +1804,85 @@ mutation make_size_estimates_mutation(const sstring& ks, std::vector register_view_for_building(sstring ks_name, sstring view_name, const dht::token& token) { + sstring req = sprint("INSERT INTO system.%s (keyspace_name, view_name, generation_number, cpu_id, first_token) VALUES (?, ?, ?, ?, ?)", + v3::SCYLLA_VIEWS_BUILDS_IN_PROGRESS); + return execute_cql( + std::move(req), + std::move(ks_name), + std::move(view_name), + 0, + int32_t(engine().cpu_id()), + dht::global_partitioner().to_sstring(token)).discard_result(); +} + +future<> update_view_build_progress(sstring ks_name, sstring view_name, const dht::token& token) { + sstring req = sprint("INSERT INTO system.%s (keyspace_name, view_name, next_token, cpu_id) VALUES (?, ?, ?, ?)", + v3::SCYLLA_VIEWS_BUILDS_IN_PROGRESS); + return execute_cql( + std::move(req), + std::move(ks_name), + std::move(view_name), + dht::global_partitioner().to_sstring(token), + int32_t(engine().cpu_id())).discard_result(); +} + +future<> remove_view_build_progress_across_all_shards(sstring ks_name, sstring view_name) { + return execute_cql( + sprint("DELETE FROM system.%s WHERE keyspace_name = ? AND view_name = ?", v3::SCYLLA_VIEWS_BUILDS_IN_PROGRESS), + std::move(ks_name), + std::move(view_name)).discard_result(); +} + +future<> mark_view_as_built(sstring ks_name, sstring view_name) { + return execute_cql( + sprint("INSERT INTO system.%s (keyspace_name, view_name) VALUES (?, ?)", v3::BUILT_VIEWS), + std::move(ks_name), + std::move(view_name)).discard_result(); +} + +future<> remove_built_view(sstring ks_name, sstring view_name) { + return execute_cql( + sprint("DELETE FROM system.%s WHERE keyspace_name = ? AND view_name = ?", v3::BUILT_VIEWS), + std::move(ks_name), + std::move(view_name)).discard_result(); +} + +future> load_built_views() { + return execute_cql(sprint("SELECT * FROM system.%s", v3::BUILT_VIEWS)).then([] (::shared_ptr cql_result) { + return boost::copy_range>(*cql_result + | boost::adaptors::transformed([] (const cql3::untyped_result_set::row& row) { + auto ks_name = row.get_as("keyspace_name"); + auto cf_name = row.get_as("view_name"); + return std::pair(std::move(ks_name), std::move(cf_name)); + })); + }); +} + +future> load_view_build_progress() { + return execute_cql(sprint("SELECT keyspace_name, view_name, first_token, next_token, cpu_id FROM system.%s", + v3::SCYLLA_VIEWS_BUILDS_IN_PROGRESS)).then([] (::shared_ptr cql_result) { + std::vector progress; + for (auto& row : *cql_result) { + auto ks_name = row.get_as("keyspace_name"); + auto cf_name = row.get_as("view_name"); + auto first_token = dht::global_partitioner().from_sstring(row.get_as("first_token")); + auto next_token_sstring = row.get_opt("next_token"); + std::optional next_token; + if (next_token_sstring) { + next_token = dht::global_partitioner().from_sstring(std::move(next_token_sstring).value()); + } + auto cpu_id = row.get_as("cpu_id"); + progress.emplace_back(view_build_progress{ + view_name(std::move(ks_name), std::move(cf_name)), + std::move(first_token), + std::move(next_token), + static_cast(cpu_id)}); + } + return progress; + }); +} + } // namespace system_keyspace sstring system_keyspace_name() { diff --git a/db/system_keyspace.hh b/db/system_keyspace.hh index d30cb50002..441e832ca3 100644 --- a/db/system_keyspace.hh +++ b/db/system_keyspace.hh @@ -40,8 +40,10 @@ #pragma once +#include #include #include +#include #include "schema.hh" #include "utils/UUID.hh" #include "gms/inet_address.hh" @@ -123,6 +125,14 @@ struct range_estimates { int64_t mean_partition_size; }; +using view_name = std::pair; +struct view_build_progress { + view_name view; + dht::token first_token; + std::optional next_token; + shard_id cpu_id; +}; + extern schema_ptr hints(); extern schema_ptr batchlog(); extern schema_ptr paxos(); @@ -653,5 +663,13 @@ future<> set_bootstrap_state(bootstrap_state state); */ mutation make_size_estimates_mutation(const sstring& ks, std::vector estimates); +future<> register_view_for_building(sstring ks_name, sstring view_name, const dht::token& token); +future<> update_view_build_progress(sstring ks_name, sstring view_name, const dht::token& token); +future<> remove_view_build_progress_across_all_shards(sstring ks_name, sstring view_name); +future<> mark_view_as_built(sstring ks_name, sstring view_name); +future<> remove_built_view(sstring ks_name, sstring view_name); +future> load_built_views(); +future> load_view_build_progress(); + } // namespace system_keyspace } // namespace db From 412f081db915ac1eb028c2a4c0bda35f8295415a Mon Sep 17 00:00:00 2001 From: Duarte Nunes Date: Sun, 11 Mar 2018 20:48:26 +0000 Subject: [PATCH 05/22] tests: Add unit test for build_progress_virtual_reader Signed-off-by: Duarte Nunes --- tests/virtual_reader_test.cc | 51 +++++++++++++++++++++++++++++++++++- 1 file changed, 50 insertions(+), 1 deletion(-) diff --git a/tests/virtual_reader_test.cc b/tests/virtual_reader_test.cc index dc49865ccc..6a8e00b95e 100644 --- a/tests/virtual_reader_test.cc +++ b/tests/virtual_reader_test.cc @@ -33,12 +33,14 @@ #include "tests/test_services.hh" #include "db/size_estimates_virtual_reader.hh" +#include "db/system_keyspace.hh" #include "core/future-util.hh" #include "cql3/query_processor.hh" #include "cql3/untyped_result_set.hh" +#include "dht/i_partitioner.hh" #include "transport/messages/result_message.hh" -SEASTAR_TEST_CASE(test_query_virtual_table) { +SEASTAR_TEST_CASE(test_query_size_estimates_virtual_table) { return do_with_cql_env([] (auto& e) { auto ranges = db::size_estimates::size_estimates_mutation_reader::get_local_ranges().get0(); auto start_token1 = utf8_type->to_string(ranges[3].start); @@ -215,3 +217,50 @@ SEASTAR_TEST_CASE(test_query_virtual_table) { }).discard_result(); }); } + +SEASTAR_TEST_CASE(test_query_view_built_progress_virtual_table) { + return do_with_cql_env_thread([] (cql_test_env& e) { + auto rand = [] { return dht::global_partitioner().get_random_token(); }; + auto next_token = rand(); + auto next_token_str = dht::global_partitioner().to_sstring(next_token); + db::system_keyspace::register_view_for_building("ks", "v1", rand()).get(); + db::system_keyspace::register_view_for_building("ks", "v2", rand()).get(); + db::system_keyspace::register_view_for_building("ks", "v3", rand()).get(); + db::system_keyspace::update_view_build_progress("ks", "v3", next_token).get(); + db::system_keyspace::register_view_for_building("ks", "v4", rand()).get(); + db::system_keyspace::update_view_build_progress("ks", "v4", next_token).get(); + db::system_keyspace::register_view_for_building("ks", "v5", rand()).get(); + db::system_keyspace::register_view_for_building("ks", "v6", rand()).get(); + db::system_keyspace::remove_view_build_progress_across_all_shards("ks", "v5").get(); + db::system_keyspace::remove_view_build_progress_across_all_shards("ks", "v6").get(); + auto rs = e.execute_cql("select * from system.views_builds_in_progress where keyspace_name = 'ks'").get0(); + assert_that(rs).is_rows().with_rows_ignore_order({ + { {utf8_type->decompose(sstring("ks"))}, {utf8_type->decompose(sstring("v1"))}, {int32_type->decompose(0)}, { } }, + { {utf8_type->decompose(sstring("ks"))}, {utf8_type->decompose(sstring("v2"))}, {int32_type->decompose(0)}, { } }, + { {utf8_type->decompose(sstring("ks"))}, {utf8_type->decompose(sstring("v3"))}, {int32_type->decompose(0)}, {utf8_type->decompose(next_token_str)} }, + { {utf8_type->decompose(sstring("ks"))}, {utf8_type->decompose(sstring("v4"))}, {int32_type->decompose(0)}, {utf8_type->decompose(next_token_str)} } + }); + rs = e.execute_cql("select * from system.views_builds_in_progress").get0(); + assert_that(rs).is_rows().with_size(4); + rs = e.execute_cql("select * from system.views_builds_in_progress limit 1").get0(); + assert_that(rs).is_rows().with_size(1); + rs = e.execute_cql("select * from system.views_builds_in_progress where keyspace_name = 'ks' and view_name = 'v2'").get0(); + assert_that(rs).is_rows().with_size(1); + rs = e.execute_cql("select * from system.views_builds_in_progress where keyspace_name = 'ks' and view_name > 'v2'").get0(); + assert_that(rs).is_rows().with_size(2); + rs = e.execute_cql("select * from system.views_builds_in_progress where keyspace_name = 'ks' and view_name >= 'v2'").get0(); + assert_that(rs).is_rows().with_size(3); + rs = e.execute_cql("select * from system.views_builds_in_progress where keyspace_name = 'ks' and view_name < 'v2'").get0(); + assert_that(rs).is_rows().with_size(1); + rs = e.execute_cql("select * from system.views_builds_in_progress where keyspace_name = 'ks' and view_name <= 'v2'").get0(); + assert_that(rs).is_rows().with_size(2); + rs = e.execute_cql("select * from system.views_builds_in_progress where keyspace_name = 'ks' and view_name in ('v1', 'v2', 'v3')").get0(); + assert_that(rs).is_rows().with_size(3); + rs = e.execute_cql("select * from system.views_builds_in_progress where keyspace_name = 'ks' and view_name >= 'v2' and view_name <= 'v2'").get0(); + assert_that(rs).is_rows().with_size(1); + rs = e.execute_cql("select * from system.views_builds_in_progress where keyspace_name = 'ks' and view_name >= 'v2' and view_name <= 'v3'").get0(); + assert_that(rs).is_rows().with_size(2); + rs = e.execute_cql("select * from system.views_builds_in_progress where keyspace_name = 'ks' and view_name > 'v1' and view_name < 'v2'").get0(); + assert_that(rs).is_rows().with_size(0); + }); +} From 78b232d98f2438d6b5c880bd2e040a40de4e758e Mon Sep 17 00:00:00 2001 From: Duarte Nunes Date: Tue, 6 Mar 2018 14:24:15 +0000 Subject: [PATCH 06/22] db: Introduce system_distributed_keyspace This patch introduces a distributed system keyspace, used to hold system tables that need to be replicated across a set of replicas (that is, can't use the LocalStrategy). In following patches, we will use this keyspace to hold a table containing view building status updates for each node, used to support range movements and a new nodetool command. Fixes #3237 Signed-off-by: Duarte Nunes --- configure.py | 1 + db/system_distributed_keyspace.cc | 143 ++++++++++++++++++++++++++++++ db/system_distributed_keyspace.hh | 58 ++++++++++++ 3 files changed, 202 insertions(+) create mode 100644 db/system_distributed_keyspace.cc create mode 100644 db/system_distributed_keyspace.hh diff --git a/configure.py b/configure.py index 21f4fffea5..9ee18f1549 100755 --- a/configure.py +++ b/configure.py @@ -491,6 +491,7 @@ scylla_core = (['database.cc', 'cql3/variable_specifications.cc', 'db/consistency_level.cc', 'db/system_keyspace.cc', + 'db/system_distributed_keyspace.cc', 'db/schema_tables.cc', 'db/cql_type_parser.cc', 'db/legacy_schema_migrator.cc', diff --git a/db/system_distributed_keyspace.cc b/db/system_distributed_keyspace.cc new file mode 100644 index 0000000000..a09ff336ab --- /dev/null +++ b/db/system_distributed_keyspace.cc @@ -0,0 +1,143 @@ +/* + * Copyright (C) 2018 ScyllaDB + */ + +/* + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Scylla. If not, see . + */ + +#include "db/system_distributed_keyspace.hh" + +#include "cql3/untyped_result_set.hh" +#include "database.hh" +#include "db/consistency_level_type.hh" +#include "db/system_keyspace.hh" +#include "schema_builder.hh" +#include "types.hh" + +#include +#include + +#include + +#include +#include +#include + +namespace db { + +schema_ptr view_build_status() { + static thread_local auto schema = [] { + auto id = generate_legacy_id(system_distributed_keyspace::NAME, system_distributed_keyspace::VIEW_BUILD_STATUS); + return schema_builder(system_distributed_keyspace::NAME, system_distributed_keyspace::VIEW_BUILD_STATUS, std::experimental::make_optional(id)) + .with_column("keyspace_name", utf8_type, column_kind::partition_key) + .with_column("view_name", utf8_type, column_kind::partition_key) + .with_column("host_id", uuid_type, column_kind::clustering_key) + .with_column("status", utf8_type) + .with_version(system_keyspace::generate_schema_version(id)) + .build(); + }(); + return schema; +} + +static std::vector all_tables() { + return { + view_build_status(), + }; +} + +system_distributed_keyspace::system_distributed_keyspace(cql3::query_processor& qp, service::migration_manager& mm) + : _qp(qp) + , _mm(mm) { +} + +future<> system_distributed_keyspace::start() { + if (engine().cpu_id() != 0) { + return make_ready_future<>(); + } + + static auto ignore_existing = [] (seastar::noncopyable_function()> func) { + return futurize_apply(std::move(func)).handle_exception_type([] (exceptions::already_exists_exception& ignored) { }); + }; + + // We use min_timestamp so that the default keyspace metadata will lose with any manual adjustments. + // See issue #2129. + return ignore_existing([this] { + auto ksm = keyspace_metadata::new_keyspace( + NAME, + "org.apache.cassandra.locator.SimpleStrategy", + {{"replication_factor", "3"}}, + true); + return _mm.announce_new_keyspace(ksm, api::min_timestamp, false); + }).then([this] { + return do_with(all_tables(), [this] (std::vector& tables) { + return do_for_each(tables, [this] (schema_ptr table) { + return ignore_existing([this, table = std::move(table)] { + return _mm.announce_new_column_family(std::move(table), false); + }); + }); + }); + }); +} + +future<> system_distributed_keyspace::stop() { + return make_ready_future<>(); +} + +future> system_distributed_keyspace::view_status(sstring ks_name, sstring view_name) const { + return _qp.process( + sprint("SELECT host_id, status FROM %s.%s WHERE keyspace_name = ? AND view_name = ?", NAME, VIEW_BUILD_STATUS), + db::consistency_level::ONE, + { std::move(ks_name), std::move(view_name) }, + false).then([this] (::shared_ptr cql_result) { + return boost::copy_range>(*cql_result + | boost::adaptors::transformed([] (const cql3::untyped_result_set::row& row) { + auto host_id = row.get_as("host_id"); + auto status = row.get_as("status"); + return std::pair(std::move(host_id), std::move(status)); + })); + }); +} + +future<> system_distributed_keyspace::start_view_build(sstring ks_name, sstring view_name) const { + return db::system_keyspace::get_local_host_id().then([this, ks_name = std::move(ks_name), view_name = std::move(view_name)] (utils::UUID host_id) { + return _qp.process( + sprint("INSERT INTO %s.%s (keyspace_name, view_name, host_id, status) VALUES (?, ?, ?, ?)", NAME, VIEW_BUILD_STATUS), + db::consistency_level::ONE, + { std::move(ks_name), std::move(view_name), std::move(host_id), "STARTED" }, + false).discard_result(); + }); +} + +future<> system_distributed_keyspace::finish_view_build(sstring ks_name, sstring view_name) const { + return db::system_keyspace::get_local_host_id().then([this, ks_name = std::move(ks_name), view_name = std::move(view_name)] (utils::UUID host_id) { + return _qp.process( + sprint("UPDATE %s.%s SET status = ? WHERE keyspace_name = ? AND view_name = ? AND host_id = ?", NAME, VIEW_BUILD_STATUS), + db::consistency_level::ONE, + { "SUCCESS", std::move(ks_name), std::move(view_name), std::move(host_id) }, + false).discard_result(); + }); +} + +future<> system_distributed_keyspace::remove_view(sstring ks_name, sstring view_name) const { + return _qp.process( + sprint("DELETE FROM %s.%s WHERE keyspace_name = ? AND view_name = ?", NAME, VIEW_BUILD_STATUS), + db::consistency_level::ONE, + { std::move(ks_name), std::move(view_name) }, + false).discard_result(); +} + +} \ No newline at end of file diff --git a/db/system_distributed_keyspace.hh b/db/system_distributed_keyspace.hh new file mode 100644 index 0000000000..39369c1e09 --- /dev/null +++ b/db/system_distributed_keyspace.hh @@ -0,0 +1,58 @@ +/* + * Copyright (C) 2018 ScyllaDB + */ + +/* + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Scylla. If not, see . + */ + +#pragma once + +#include "bytes.hh" +#include "cql3/query_processor.hh" +#include "schema.hh" +#include "service/migration_manager.hh" +#include "utils/UUID.hh" + +#include +#include + +#include + +namespace db { + +class system_distributed_keyspace { +public: + static constexpr auto NAME = "system_distributed"; + static constexpr auto VIEW_BUILD_STATUS = "view_build_status"; + +private: + cql3::query_processor& _qp; + service::migration_manager& _mm; + +public: + system_distributed_keyspace(cql3::query_processor&, service::migration_manager&); + + future<> start(); + future<> stop(); + + future> view_status(sstring ks_name, sstring view_name) const; + future<> start_view_build(sstring ks_name, sstring view_name) const; + future<> finish_view_build(sstring ks_name, sstring view_name) const; + future<> remove_view(sstring ks_name, sstring view_name) const; +}; + +} \ No newline at end of file From ff15068a41ee7beda3f6436b94a5e91ac174ae7d Mon Sep 17 00:00:00 2001 From: Duarte Nunes Date: Tue, 6 Mar 2018 14:31:27 +0000 Subject: [PATCH 07/22] service/storage_service: Allow querying the view build status This patch adds support for the nodetool viewbuildstatus command, which shows the progress of a materialized view build across the cluster. A view can be absent from the result, successfully built, or currently being built. Signed-off-by: Duarte Nunes --- api/api-doc/storage_service.json | 35 ++++++++++++++++++++++++++++++++ api/storage_service.cc | 9 ++++++++ init.cc | 4 ++-- init.hh | 3 ++- main.cc | 7 +++++-- service/storage_service.cc | 27 ++++++++++++++++++++++-- service/storage_service.hh | 10 ++++++--- tests/cql_test_env.cc | 8 ++++++-- tests/gossip.cc | 4 +++- tests/gossip_test.cc | 4 +++- 10 files changed, 97 insertions(+), 14 deletions(-) diff --git a/api/api-doc/storage_service.json b/api/api-doc/storage_service.json index f4ffb0f4d4..5756c7e898 100644 --- a/api/api-doc/storage_service.json +++ b/api/api-doc/storage_service.json @@ -2129,6 +2129,41 @@ ] } ] + }, + { + "path":"/storage_service/view_build_statuses/{keyspace}/{view}", + "operations":[ + { + "method":"GET", + "summary":"Gets the progress of a materialized view build", + "type":"array", + "items":{ + "type":"mapper" + }, + "nickname":"view_build_statuses", + "produces":[ + "application/json" + ], + "parameters":[ + { + "name":"keyspace", + "description":"The keyspace", + "required":true, + "allowMultiple":false, + "type":"string", + "paramType":"path" + }, + { + "name":"view", + "description":"View name", + "required":true, + "allowMultiple":false, + "type":"string", + "paramType":"path" + } + ] + } + ] } ], "models":{ diff --git a/api/storage_service.cc b/api/storage_service.cc index 4a9c6bee3b..385bd9303f 100644 --- a/api/storage_service.cc +++ b/api/storage_service.cc @@ -852,6 +852,15 @@ void set_storage_service(http_context& ctx, routes& r) { return make_ready_future(map_to_key_value(ownership, res)); }); }); + + ss::view_build_statuses.set(r, [&ctx] (std::unique_ptr req) { + auto keyspace = validate_keyspace(ctx, req->param); + auto view = req->param["view"]; + return service::get_local_storage_service().view_build_statuses(std::move(keyspace), std::move(view)).then([] (std::unordered_map status) { + std::vector res; + return make_ready_future(map_to_key_value(std::move(status), res)); + }); + }); } } diff --git a/init.cc b/init.cc index 6f5d9b993f..dbd72eb1bb 100644 --- a/init.cc +++ b/init.cc @@ -34,8 +34,8 @@ logging::logger startlog("init"); // duplicated in cql_test_env.cc // until proper shutdown is done. -void init_storage_service(distributed& db, sharded& auth_service) { - service::init_storage_service(db, auth_service).get(); +void init_storage_service(distributed& db, sharded& auth_service, sharded& sys_dist_ks) { + service::init_storage_service(db, auth_service, sys_dist_ks).get(); // #293 - do not stop anything //engine().at_exit([] { return service::deinit_storage_service(); }); } diff --git a/init.hh b/init.hh index 61b6728b57..c8f9eb960b 100644 --- a/init.hh +++ b/init.hh @@ -25,6 +25,7 @@ #include #include "auth/service.hh" #include "db/config.hh" +#include "db/system_distributed_keyspace.hh" #include "database.hh" #include "log.hh" @@ -36,7 +37,7 @@ extern logging::logger startlog; class bad_configuration_error : public std::exception {}; -void init_storage_service(distributed& db, sharded&); +void init_storage_service(distributed& db, sharded&, sharded&); void init_ms_fd_gossiper(sstring listen_address , uint16_t storage_port , uint16_t ssl_storage_port diff --git a/main.cc b/main.cc index 2cc82da107..3aca8d2f54 100644 --- a/main.cc +++ b/main.cc @@ -35,6 +35,7 @@ #include "service/load_broadcaster.hh" #include "streaming/stream_session.hh" #include "db/system_keyspace.hh" +#include "db/system_distributed_keyspace.hh" #include "db/batchlog_manager.hh" #include "db/commitlog/commitlog.hh" #include "db/hints/manager.hh" @@ -457,9 +458,11 @@ int main(int ac, char** av) { ctx.http_server.listen(ipv4_addr{ip, api_port}).get(); startlog.info("Scylla API server listening on {}:{} ...", api_address, api_port); static sharded auth_service; + static sharded sys_dist_ks; supervisor::notify("initializing storage service"); - init_storage_service(db, auth_service); + init_storage_service(db, auth_service, sys_dist_ks); supervisor::notify("starting per-shard database core"); + // Note: changed from using a move here, because we want the config object intact. database_config dbcfg; auto make_sched_group = [&] (sstring name, unsigned shares) { @@ -614,7 +617,7 @@ int main(int ac, char** av) { } // If the same sstable is shared by several shards, it cannot be // deleted until all shards decide to compact it. So we want to - // start thse compactions now. Note we start compacting only after + // start these compactions now. Note we start compacting only after // all sstables in this CF were loaded on all shards - otherwise // we will have races between the compaction and loading processes // We also want to trigger regular compaction on boot. diff --git a/service/storage_service.cc b/service/storage_service.cc index f8cc9ac9fa..f27d29adcc 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -105,11 +105,12 @@ int get_generation_number() { return generation_number; } -storage_service::storage_service(distributed& db, sharded& auth_service) +storage_service::storage_service(distributed& db, sharded& auth_service, sharded& sys_dist_ks) : _db(db) , _auth_service(auth_service) , _replicate_action([this] { return do_replicate_to_all_cores(); }) - , _update_pending_ranges_action([this] { return do_update_pending_ranges(); }) { + , _update_pending_ranges_action([this] { return do_update_pending_ranges(); }) + , _sys_dist_ks(sys_dist_ks) { sstable_read_error.connect([this] { isolate_on_error(); }); sstable_write_error.connect([this] { isolate_on_error(); }); general_disk_error.connect([this] { isolate_on_error(); }); @@ -551,6 +552,12 @@ void storage_service::join_token_ring(int delay) { supervisor::notify("starting tracing"); tracing::tracing::start_tracing().get(); + + supervisor::notify("starting system distributed keyspace"); + _sys_dist_ks.start( + std::ref(cql3::get_query_processor()), + std::ref(service::get_migration_manager())).get(); + _sys_dist_ks.invoke_on_all(&db::system_distributed_keyspace::start).get(); } else { slogger.info("Startup complete, but write survey mode is active, not becoming an active ring member. Use JMX (StorageService->joinRing()) to finalize ring joining."); } @@ -1259,6 +1266,9 @@ future<> storage_service::drain_on_shutdown() { tracing::tracing::tracing_instance().stop().get(); slogger.info("Drain on shutdown: tracing is stopped"); + ss._sys_dist_ks.invoke_on_all(&db::system_distributed_keyspace::stop).get(); + slogger.info("Drain on shutdown: system distributed keyspace stopped"); + get_storage_proxy().invoke_on_all([] (storage_proxy& local_proxy) { return local_proxy.stop_hints_manager(); }).get(); @@ -3419,5 +3429,18 @@ storage_service::get_natural_endpoints(const sstring& keyspace, const token& pos return _db.local().find_keyspace(keyspace).get_replication_strategy().get_natural_endpoints(pos); } +future> +storage_service::view_build_statuses(sstring keyspace, sstring view_name) const { + return _sys_dist_ks.local().view_status(std::move(keyspace), std::move(view_name)).then([this] (std::unordered_map status) { + auto& endpoint_to_host_id = get_token_metadata().get_endpoint_to_host_id_map_for_reading(); + return boost::copy_range>(endpoint_to_host_id + | boost::adaptors::transformed([&status] (const std::pair& p) { + auto it = status.find(p.second); + auto s = it != status.end() ? std::move(it->second) : "UNKNOWN"; + return std::pair(p.first.to_sstring(), std::move(s)); + })); + }); +} + } // namespace service diff --git a/service/storage_service.hh b/service/storage_service.hh index 3e971fb65a..df890b9c91 100644 --- a/service/storage_service.hh +++ b/service/storage_service.hh @@ -51,6 +51,7 @@ #include "dht/token_range_endpoints.hh" #include "core/sleep.hh" #include "gms/application_state.hh" +#include "db/system_distributed_keyspace.hh" #include "core/semaphore.hh" #include "utils/fb_utilities.hh" #include "utils/serialized_action.hh" @@ -133,7 +134,7 @@ private: bool _ms_stopped = false; bool _stream_manager_stopped = false; public: - storage_service(distributed& db, sharded&); + storage_service(distributed& db, sharded&, sharded&); void isolate_on_error(); void isolate_on_commit_error(); @@ -732,6 +733,7 @@ private: future<> do_replicate_to_all_cores(); serialized_action _replicate_action; serialized_action _update_pending_ranges_action; + sharded& _sys_dist_ks; private: /** * Replicates token_metadata contents on shard0 instance to other shards. @@ -2026,6 +2028,8 @@ public: } #endif + future> view_build_statuses(sstring keyspace, sstring view_name) const; + private: /** * Seed data to the endpoints that will be responsible for it at the future @@ -2307,8 +2311,8 @@ public: } }; -inline future<> init_storage_service(distributed& db, sharded& auth_service) { - return service::get_storage_service().start(std::ref(db), std::ref(auth_service)); +inline future<> init_storage_service(distributed& db, sharded& auth_service, sharded& sys_dist_ks) { + return service::get_storage_service().start(std::ref(db), std::ref(auth_service), std::ref(sys_dist_ks)); } inline future<> deinit_storage_service() { diff --git a/tests/cql_test_env.cc b/tests/cql_test_env.cc index f96e217ac4..fad14ee8c0 100644 --- a/tests/cql_test_env.cc +++ b/tests/cql_test_env.cc @@ -47,6 +47,7 @@ #include "service/storage_service.hh" #include "auth/service.hh" #include "db/system_keyspace.hh" +#include "db/system_distributed_keyspace.hh" namespace sstables { @@ -308,9 +309,11 @@ public: auto stop_ms = defer([&ms] { ms.stop().get(); }); auto auth_service = ::make_shared>(); + auto sys_dist_ks = seastar::sharded(); + auto stop_sys_dist_ks = defer([&sys_dist_ks] { sys_dist_ks.stop().get(); }); auto& ss = service::get_storage_service(); - ss.start(std::ref(*db), std::ref(*auth_service)).get(); + ss.start(std::ref(*db), std::ref(*auth_service), std::ref(sys_dist_ks)).get(); auto stop_storage_service = defer([&ss] { ss.stop().get(); }); db->start(std::move(*cfg), database_config()).get(); @@ -423,12 +426,13 @@ future<> do_with_cql_env_thread(std::function func) { class storage_service_for_tests::impl { distributed _db; sharded _auth_service; + sharded _sys_dist_ks; public: impl() { auto thread = seastar::thread_impl::get(); assert(thread); netw::get_messaging_service().start(gms::inet_address("127.0.0.1"), 7000, false).get(); - service::get_storage_service().start(std::ref(_db), std::ref(_auth_service)).get(); + service::get_storage_service().start(std::ref(_db), std::ref(_auth_service), std::ref(_sys_dist_ks)).get(); service::get_storage_service().invoke_on_all([] (auto& ss) { ss.enable_all_features(); }).get(); diff --git a/tests/gossip.cc b/tests/gossip.cc index c42de5b6fc..8cac8a8b3c 100644 --- a/tests/gossip.cc +++ b/tests/gossip.cc @@ -22,6 +22,7 @@ #include "core/reactor.hh" #include "core/app-template.hh" +#include "db/system_distributed_keyspace.hh" #include "message/messaging_service.hh" #include "gms/failure_detector.hh" #include "gms/gossiper.hh" @@ -69,7 +70,8 @@ int main(int ac, char ** av) { utils::fb_utilities::set_broadcast_rpc_address(listen); auto vv = std::make_shared(); locator::i_endpoint_snitch::create_snitch("SimpleSnitch").then([&auth_service, &db] { - return service::init_storage_service(db, auth_service); + sharded sys_dist_ks; + return service::init_storage_service(db, auth_service, sys_dist_ks); }).then([vv, listen, config] { return netw::get_messaging_service().start(listen); }).then([config] { diff --git a/tests/gossip_test.cc b/tests/gossip_test.cc index 12fd4867c4..2def02f6d1 100644 --- a/tests/gossip_test.cc +++ b/tests/gossip_test.cc @@ -31,14 +31,16 @@ #include "service/storage_service.hh" #include "core/distributed.hh" #include "database.hh" +#include "db/system_distributed_keyspace.hh" SEASTAR_TEST_CASE(test_boot_shutdown){ return seastar::async([] { distributed db; sharded auth_service; + sharded sys_dist_ks; utils::fb_utilities::set_broadcast_address(gms::inet_address("127.0.0.1")); locator::i_endpoint_snitch::create_snitch("SimpleSnitch").get(); - service::get_storage_service().start(std::ref(db), std::ref(auth_service)).get(); + service::get_storage_service().start(std::ref(db), std::ref(auth_service), std::ref(sys_dist_ks)).get(); db.start().get(); netw::get_messaging_service().start(gms::inet_address("127.0.0.1")).get(); gms::get_failure_detector().start().get(); From dc44a0837061e1ee332b7fcfd5db6dbac1b43724 Mon Sep 17 00:00:00 2001 From: Duarte Nunes Date: Wed, 12 Jul 2017 00:28:23 +0200 Subject: [PATCH 08/22] db/view: Return a future when sending view updates While we now send view mutations asynchronously in the normal view write path, other processes interested in sending view updates, such as streaming or view building, may wish to do it synchronously. Signed-off-by: Duarte Nunes --- database.cc | 2 +- db/view/view.cc | 15 ++++++++++----- db/view/view.hh | 2 +- 3 files changed, 12 insertions(+), 7 deletions(-) diff --git a/database.cc b/database.cc index fc1afa3326..3ebbfe7251 100644 --- a/database.cc +++ b/database.cc @@ -4317,7 +4317,7 @@ future<> column_family::generate_and_propagate_view_updates(const schema_ptr& ba std::move(views), flat_mutation_reader_from_mutations({std::move(m)}), std::move(existings)).then([base_token = std::move(base_token)] (auto&& updates) { - db::view::mutate_MV(std::move(base_token), std::move(updates)); + db::view::mutate_MV(std::move(base_token), std::move(updates)).handle_exception([] (auto ignored) { }); }); } diff --git a/db/view/view.cc b/db/view/view.cc index de21a483c7..9f9ad1ae4a 100644 --- a/db/view/view.cc +++ b/db/view/view.cc @@ -791,7 +791,7 @@ get_view_natural_endpoint(const sstring& keyspace_name, // for the writes to complete. // FIXME: I dropped a lot of parameters the Cassandra version had, // we may need them back: writeCommitLog, baseComplete, queryStartNanoTime. -void mutate_MV(const dht::token& base_token, +future<> mutate_MV(const dht::token& base_token, std::vector mutations) { #if 0 @@ -823,6 +823,7 @@ void mutate_MV(const dht::token& base_token, () -> asyncRemoveFromBatchlog(batchlogEndpoints, batchUUID)); // add a handler for each mutation - includes checking availability, but doesn't initiate any writes, yet #endif + auto fs = std::make_unique>>(); for (auto& mut : mutations) { auto view_token = mut.token(); auto keyspace_name = mut.schema()->ks_name(); @@ -840,9 +841,10 @@ void mutate_MV(const dht::token& base_token, // do not wait for it to complete. // Note also that mutate_locally(mut) copies mut (in // frozen form) so don't need to increase its lifetime. - service::get_local_storage_proxy().mutate_locally(mut).handle_exception([] (auto ep) { + fs->push_back(service::get_local_storage_proxy().mutate_locally(mut).handle_exception([] (auto ep) { vlogger.error("Error applying local view update: {}", ep); - }); + return make_exception_future<>(std::move(ep)); + })); } else { vlogger.debug("Sending view update to endpoint {}, with pending endpoints = {}", *paired_endpoint, pending_endpoints); // Note we don't wait for the asynchronous operation to complete @@ -852,9 +854,10 @@ void mutate_MV(const dht::token& base_token, // to send the update there. Currently, we do this from *each* of // the base replicas, but this is probably excessive - see // See https://issues.apache.org/jira/browse/CASSANDRA-14262/ - service::get_local_storage_proxy().send_to_endpoint(std::move(mut), *paired_endpoint, std::move(pending_endpoints), db::write_type::VIEW).handle_exception([paired_endpoint] (auto ep) { + fs->push_back(service::get_local_storage_proxy().send_to_endpoint(std::move(mut), *paired_endpoint, std::move(pending_endpoints), db::write_type::VIEW).handle_exception([paired_endpoint] (auto ep) { vlogger.error("Error applying view update to {}: {}", *paired_endpoint, ep); - });; + return make_exception_future<>(std::move(ep)); + })); } } else { #if 0 @@ -897,6 +900,8 @@ void mutate_MV(const dht::token& base_token, viewWriteMetrics.addNano(System.nanoTime() - startTime); } #endif + auto f = seastar::when_all_succeed(fs->begin(), fs->end()); + return f.finally([fs = std::move(fs)] { }); } } // namespace view diff --git a/db/view/view.hh b/db/view/view.hh index 3f21fb21cb..92bdef11a8 100644 --- a/db/view/view.hh +++ b/db/view/view.hh @@ -92,7 +92,7 @@ query::clustering_row_ranges calculate_affected_clustering_ranges( const mutation_partition& mp, const std::vector& views); -void mutate_MV(const dht::token& base_token, +future<> mutate_MV(const dht::token& base_token, std::vector mutations); } From 9b9ba525f70df1f2fc6c979069b42c7f073b4bf7 Mon Sep 17 00:00:00 2001 From: Duarte Nunes Date: Mon, 12 Mar 2018 19:24:13 +0000 Subject: [PATCH 09/22] database: Add get_views() function Returns all the schemas that are views. Signed-off-by: Duarte Nunes --- database.cc | 5 +++++ database.hh | 2 ++ 2 files changed, 7 insertions(+) diff --git a/database.cc b/database.cc index 3ebbfe7251..513cfcfa57 100644 --- a/database.cc +++ b/database.cc @@ -2914,6 +2914,11 @@ bool database::has_schema(const sstring& ks_name, const sstring& cf_name) const return _ks_cf_to_uuid.count(std::make_pair(ks_name, cf_name)) > 0; } +std::vector database::get_views() const { + return boost::copy_range>(get_non_system_column_families() + | boost::adaptors::filtered([] (auto& cf) { return cf->schema()->is_view(); }) + | boost::adaptors::transformed([] (auto& cf) { return view_ptr(cf->schema()); })); +} void database::create_in_memory_keyspace(const lw_shared_ptr& ksm) { keyspace ks(ksm, std::move(make_keyspace_config(*ksm))); diff --git a/database.hh b/database.hh index ee49939b7b..573e3d4733 100644 --- a/database.hh +++ b/database.hh @@ -1273,6 +1273,8 @@ public: std::vector> get_non_system_column_families() const; + std::vector get_views() const; + const std::unordered_map, utils::UUID, utils::tuple_hash>& get_column_families_mapping() const { return _ks_cf_to_uuid; From 9640205f11a6d68d80d22dd7f124a6eb867518b5 Mon Sep 17 00:00:00 2001 From: Duarte Nunes Date: Mon, 12 Mar 2018 19:24:52 +0000 Subject: [PATCH 10/22] database: Compare view id instead of name in find_views() Signed-off-by: Duarte Nunes --- database.cc | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/database.cc b/database.cc index 513cfcfa57..374d134333 100644 --- a/database.cc +++ b/database.cc @@ -4270,9 +4270,10 @@ void column_family::set_schema(schema_ptr s) { static std::vector::iterator find_view(std::vector& views, const view_ptr& v) { return std::find_if(views.begin(), views.end(), [&v] (auto&& e) { - return e->cf_name() == v->cf_name(); + return e->id() == v->id(); }); } + void column_family::add_or_update_view(view_ptr v) { auto existing = find_view(_views, v); if (existing != _views.end()) { From 67dd3e6e5dab10fa3c0dd8fe56a669d40646849b Mon Sep 17 00:00:00 2001 From: Duarte Nunes Date: Mon, 12 Mar 2018 19:27:02 +0000 Subject: [PATCH 11/22] column_family: Allow synchronizing with in-progress writes This patch adds a mechanism to class column_family through which we can synchronize with in-progress writes. This is useful for code that, after some modification, needs to ensure that new writes will see it before it can proceed. In particular, this will be used by the view building code, which needs to wait until the in-progress writes, which may have missed that there is now a view, is observable to the view building code. Signed-off-by: Duarte Nunes --- database.cc | 13 ++++++++----- database.hh | 13 +++++++++++++ 2 files changed, 21 insertions(+), 5 deletions(-) diff --git a/database.cc b/database.cc index 374d134333..46b2f23e7d 100644 --- a/database.cc +++ b/database.cc @@ -3264,7 +3264,7 @@ future database::do_apply_counter_update(column_family& cf, const froz std::move(regular_columns), { }, { }, cql_serialization_format::internal(), query::max_rows); return do_with(std::move(slice), std::move(m), std::vector(), - [this, &cf, timeout, trace_state = std::move(trace_state)] (const query::partition_slice& slice, mutation& m, std::vector& locks) mutable { + [this, &cf, timeout, trace_state = std::move(trace_state), op = cf.write_in_progress()] (const query::partition_slice& slice, mutation& m, std::vector& locks) mutable { tracing::trace(trace_state, "Acquiring counter locks"); return cf.lock_counter_cells(m, timeout).then([&, m_schema = cf.schema(), trace_state = std::move(trace_state), timeout, this] (std::vector lcs) mutable { locks = std::move(lcs); @@ -3496,16 +3496,19 @@ future<> database::do_apply(schema_ptr s, const frozen_mutation& m, db::timeout_ throw std::runtime_error(sprint("attempted to mutate using not synced schema of %s.%s, version=%s", s->ks_name(), s->cf_name(), s->version())); } + + // Signal to view building code that a write is in progress, + // so it knows when new writes start being sent to a new view. + auto op = cf.write_in_progress(); if (cf.views().empty()) { - return apply_with_commitlog(std::move(s), cf, std::move(uuid), m, timeout); + return apply_with_commitlog(std::move(s), cf, std::move(uuid), m, timeout).finally([op = std::move(op)] { }); } future f = cf.push_view_replica_updates(s, m); - return f.then([this, s = std::move(s), uuid = std::move(uuid), &m, timeout] (row_locker::lock_holder lock) { - auto& cf = find_column_family(uuid); + return f.then([this, s = std::move(s), uuid = std::move(uuid), &m, timeout, &cf, op = std::move(op)] (row_locker::lock_holder lock) mutable { return apply_with_commitlog(std::move(s), cf, std::move(uuid), m, timeout).finally( // Hold the local lock on the base-table partition or row // taken before the read, until the update is done. - [lock = std::move(lock)] { }); + [lock = std::move(lock), op = std::move(op)] { }); }); }); } diff --git a/database.hh b/database.hh index 573e3d4733..b83c84f8d8 100644 --- a/database.hh +++ b/database.hh @@ -461,6 +461,11 @@ private: double _cached_percentile = -1; lowres_clock::time_point _percentile_cache_timestamp; std::chrono::milliseconds _percentile_cache_value; + + // Phaser used to synchronize with in-progress writes. This is useful for code that, + // after some modification, needs to ensure that news writes will see it before + // it can proceed, such as the view building code. + utils::phased_barrier _pending_writes_phaser; private: void update_stats_for_new_sstable(uint64_t disk_space_used_by_sstable, const std::vector& shards_for_the_sstable) noexcept; // Adds new sstable to the set of sstables @@ -784,6 +789,14 @@ public: future<> run_with_compaction_disabled(std::function ()> func); + utils::phased_barrier::operation write_in_progress() { + return _pending_writes_phaser.start(); + } + + future<> await_pending_writes() { + return _pending_writes_phaser.advance_and_await(); + } + void add_or_update_view(view_ptr v); void remove_view(view_ptr v); const std::vector& views() const; From f298f57137c0eb1da16f46a287503fb52b351ece Mon Sep 17 00:00:00 2001 From: Duarte Nunes Date: Mon, 12 Mar 2018 19:37:28 +0000 Subject: [PATCH 12/22] column_family: Add function to populate views The populate_views() function takes a set of views to update, a tokento select base table partitions, and the set of sstables to query. This lays the foundation for a view building mechanism to exist, which walks over a given base table, reads data token-by-token, calculates view updates (in a simplified way, compared to the existing functions that push view updates), and sends them to the paired view replicas. Signed-off-by: Duarte Nunes --- database.cc | 25 +++++++++++++++++++++++++ database.hh | 6 ++++++ 2 files changed, 31 insertions(+) diff --git a/database.cc b/database.cc index 46b2f23e7d..1a983488d3 100644 --- a/database.cc +++ b/database.cc @@ -4465,6 +4465,31 @@ column_family::local_base_lock(const schema_ptr& s, const dht::decorated_key& pk } } +/** + * Given some updates on the base table and assuming there are no pre-existing, overlapping updates, + * generates the mutations to be applied to the base table's views, and sends them to the paired + * view replicas. The future resolves when the updates have been acknowledged by the repicas, i.e., + * propagating the view updates to the view replicas happens synchronously. + * + * @param views the affected views which need to be updated. + * @param base_token The token to use to match the base replica with the paired replicas. + * @param reader the base table updates being applied, which all correspond to the base token. + * @return a future that resolves when the updates have been acknowledged by the view replicas + */ +future<> column_family::populate_views( + std::vector views, + dht::token base_token, + flat_mutation_reader&& reader) { + auto& schema = reader.schema(); + return db::view::generate_view_updates( + schema, + std::move(views), + std::move(reader), + { }).then([base_token = std::move(base_token)] (auto&& updates) { + return db::view::mutate_MV(std::move(base_token), std::move(updates)); + }); +} + void column_family::set_hit_rate(gms::inet_address addr, cache_temperature rate) { auto& e = _cluster_cache_hit_rates[addr]; e.rate = rate; diff --git a/database.hh b/database.hh index b83c84f8d8..7cf2b088ff 100644 --- a/database.hh +++ b/database.hh @@ -811,6 +811,12 @@ public: uint64_t large_partition_warning_threshold_bytes() const { return _config.large_partition_warning_threshold_bytes; } + + future<> populate_views( + std::vector, + dht::token base_token, + flat_mutation_reader&&); + private: std::vector affected_views(const schema_ptr& base, const mutation& update) const; future<> generate_and_propagate_view_updates(const schema_ptr& base, From 901faabaa2d4ceda0b41497a298717203d118894 Mon Sep 17 00:00:00 2001 From: Duarte Nunes Date: Mon, 12 Mar 2018 20:46:12 +0000 Subject: [PATCH 13/22] db/view: Introduce view_builder This patch introduces the view_builder class, a sharded service responsible for building all defined materialized views. This process entails walking over the existing data in a given base table, and using it to calculate and insert the respective entries for one or more views. This patch introduces only the bootstrap functionality, which is responsible for loading the data stored in the system tables and filling the in-memory data structures with the relevant information, to be used in subsequent patches for the actual view building. The interaction with the system tables is as follows. Interaction with the tables in system_keyspace: - When we start building a view, we add an entry to the scylla_views_builds_in_progress system table. If the node restarts at this point, we'll consider these newly inserted views as having made no progress, and we'll treat them as new views; - When we finish a build step, we update the progress of the views that we built during this step by writing the next token to the scylla_views_builds_in_progress table. If the node restarts here, we'll start building the views at the token in the next_token column. - When we finish building a view, we mark it as completed in the built views system table, and remove it from the in-progress system table. Under failure, the following can happen: * When we fail to mark the view as built, we'll redo the last step upon node reboot; * When we fail to delete the in-progress record, upon reboot we'll remove this record. A view is marked as completed only when all shards have finished their share of the work, that is, if a view is not built, then all shards will still have an entry in the in-progress system table; - A view that a shard finished building, but not all other shards, remains in the in-progress system table, with first_token == next_token. Interaction with the distributed system table (view_build_status): - When we start building a view, we mark the view build as being in-progress; - When we finish building a view, we mark the view as being built. Upon failure, we ensure that if the view is in the in-progress system table, then it may not have been written to this table. We don't load the built views from this table when starting. When starting, the following happens: * If the view is in the system.built_views table and not the in-progress system table, then it will be in view_build_status; * If the view is in the system.built_views table and not in this one, it will still be in the in-progress system table - we detect this and mark it as built in this table too, keeping the invariant; * If the view is in this table but not in system.built_views, then it will also be in the in-progress system table - we don't detect this and will redo the missing step, for simplicity. View building is necessarily a sharded process. That means that on restart, if the number of shards has changed, we need to calculate the most conservative token range that has been built, and build the remainder. Signed-off-by: Duarte Nunes --- db/view/view.cc | 268 +++++++++++++++++++++++++++++++++++++++- db/view/view_builder.hh | 162 ++++++++++++++++++++++++ main.cc | 10 ++ 3 files changed, 439 insertions(+), 1 deletion(-) create mode 100644 db/view/view_builder.hh diff --git a/db/view/view.cc b/db/view/view.cc index 9f9ad1ae4a..ef6bcdeeed 100644 --- a/db/view/view.cc +++ b/db/view/view.cc @@ -39,22 +39,33 @@ * along with Scylla. If not, see . */ -#include #include +#include +#include +#include +#include +#include #include #include +#include + +#include "database.hh" #include "clustering_bounds_comparator.hh" #include "cql3/statements/select_statement.hh" #include "cql3/util.hh" #include "db/view/view.hh" +#include "db/view/view_builder.hh" #include "gms/inet_address.hh" #include "keys.hh" #include "locator/network_topology_strategy.hh" +#include "service/migration_manager.hh" #include "service/storage_service.hh" #include "view_info.hh" +using namespace std::chrono_literals; + static logging::logger vlogger("view"); view_info::view_info(const schema& schema, const raw_view_info& raw_view_info) @@ -904,6 +915,261 @@ future<> mutate_MV(const dht::token& base_token, return f.finally([fs = std::move(fs)] { }); } +view_builder::view_builder(database& db, db::system_distributed_keyspace& sys_dist_ks) + : _db(db) + , _sys_dist_ks(sys_dist_ks) { +} + +future<> view_builder::start() { + return seastar::async([this] { + auto built = system_keyspace::load_built_views().get0(); + auto in_progress = system_keyspace::load_view_build_progress().get0(); + calculate_shard_build_step(std::move(built), std::move(in_progress)).get(); + _current_step = _base_to_build_step.begin(); + _build_step.trigger(); + }); +} + +future<> view_builder::stop() { + vlogger.info("Stopping view builder"); + return _build_step.join(); +} + +static query::partition_slice make_partition_slice(const schema& s) { + query::partition_slice::option_set opts; + opts.set(query::partition_slice::option::send_partition_key); + opts.set(query::partition_slice::option::send_clustering_key); + opts.set(query::partition_slice::option::send_timestamp); + opts.set(query::partition_slice::option::send_ttl); + return query::partition_slice( + {query::full_clustering_range}, + { }, + boost::copy_range>(s.regular_columns() + | boost::adaptors::transformed(std::mem_fn(&column_definition::id))), + std::move(opts)); +} + +view_builder::build_step& view_builder::get_or_create_build_step(utils::UUID base_id) { + auto it = _base_to_build_step.find(base_id); + if (it == _base_to_build_step.end()) { + auto base = _db.find_column_family(base_id).shared_from_this(); + auto p = _base_to_build_step.emplace(base_id, build_step{base, make_partition_slice(*base->schema())}); + // Iterators could have been invalidated if there was rehashing, so just reset the cursor. + _current_step = p.first; + it = p.first; + } + return it->second; +} + +void view_builder::initialize_reader_at_current_token(build_step& step) { + step.pslice = make_partition_slice(*step.base->schema()); + step.prange = dht::partition_range(dht::ring_position::starting_at(step.current_token()), dht::ring_position::max()); + step.reader = make_local_shard_sstable_reader( + step.base->schema(), + make_lw_shared(sstables::sstable_set(step.base->get_sstable_set())), + step.prange, + step.pslice, + default_priority_class(), + no_resource_tracking(), + nullptr, + streamed_mutation::forwarding::no, + mutation_reader::forwarding::no); +} + +void view_builder::load_view_status(view_builder::view_build_status status, std::unordered_set& loaded_views) { + if (!status.next_token) { + // No progress was made on this view, so we'll treat it as new. + return; + } + vlogger.info0("Resuming to build view {}.{} at {}", status.view->ks_name(), status.view->cf_name(), *status.next_token); + loaded_views.insert(status.view->id()); + if (status.first_token == *status.next_token) { + // Completed, so nothing to do for this shard. Consider the view + // as loaded and not as a new view. + _built_views.emplace(status.view->id()); + return; + } + get_or_create_build_step(status.view->view_info()->base_id()).build_status.emplace_back(std::move(status)); +} + +void view_builder::reshard( + std::vector> view_build_status_per_shard, + std::unordered_set& loaded_views) { + // We must reshard. We aim for a simple algorithm, a step above not starting from scratch. + // Shards build entries at different paces, so both first and last tokens will differ. We + // want to be conservative when selecting the range that has been built. To do that, we + // select the intersection of all the previous shard's ranges for each view. + struct view_ptr_hash { + std::size_t operator()(const view_ptr& v) const noexcept { + return std::hash()(v->id()); + } + }; + struct view_ptr_equals { + bool operator()(const view_ptr& v1, const view_ptr& v2) const noexcept { + return v1->id() == v2->id(); + } + }; + std::unordered_map>, view_ptr_hash, view_ptr_equals> my_status; + for (auto& shard_status : view_build_status_per_shard) { + for (auto& [view, first_token, next_token] : shard_status ) { + // We start from an open-ended range, which we'll try to restrict. + auto& my_range = my_status.emplace( + std::move(view), + nonwrapping_range::make_open_ended_both_sides()).first->second; + if (!next_token || !my_range) { + // A previous shard made no progress, so for this view we'll start over. + my_range = stdx::nullopt; + continue; + } + if (first_token == *next_token) { + // Completed, so don't consider this shard's progress. We know that if the view + // is marked as in-progress, then at least one shard will have a non-full range. + continue; + } + wrapping_range other_range(first_token, *next_token); + if (other_range.is_wrap_around(dht::token_comparator())) { + // The intersection of a wrapping range with a non-wrapping range may yield more + // multiple non-contiguous ranges. To avoid the complexity of dealing with more + // than one range, we'll just take one of the intersections. + auto [bottom_range, top_range] = other_range.unwrap(); + if (auto bottom_int = my_range->intersection(nonwrapping_range(std::move(bottom_range)), dht::token_comparator())) { + my_range = std::move(bottom_int); + } else { + my_range = my_range->intersection(nonwrapping_range(std::move(top_range)), dht::token_comparator()); + } + } else { + my_range = my_range->intersection(nonwrapping_range(std::move(other_range)), dht::token_comparator()); + } + } + } + view_builder::base_to_build_step_type build_step; + for (auto& [view, opt_range] : my_status) { + if (!opt_range) { + continue; // Treat it as a new table. + } + auto start_bound = opt_range->start() ? std::move(opt_range->start()->value()) : dht::minimum_token(); + auto end_bound = opt_range->end() ? std::move(opt_range->end()->value()) : dht::minimum_token(); + auto s = view_build_status{std::move(view), std::move(start_bound), std::move(end_bound)}; + load_view_status(std::move(s), loaded_views); + } +} + +future<> view_builder::calculate_shard_build_step( + std::vector built, + std::vector in_progress) { + // Shard 0 makes cleanup changes to the system tables, but none that could conflict + // with the other shards; everyone is thus able to proceed independently. + auto bookkeeping_ops = std::make_unique>>(); + auto base_table_exists = [&, this] (const view_ptr& view) { + // This is a safety check in case this node missed a create MV statement + // but got a drop table for the base, and another node didn't get the + // drop notification and sent us the view schema. + try { + _db.find_schema(view->view_info()->base_id()); + return true; + } catch (const no_such_column_family&) { + return false; + } + }; + auto maybe_fetch_view = [&, this] (system_keyspace::view_name& name) { + try { + auto s = _db.find_schema(name.first, name.second); + if (s->is_view()) { + auto view = view_ptr(std::move(s)); + if (base_table_exists(view)) { + return view; + } + } + // The view was dropped and a table was re-created with the same name, + // but the write to the view-related system tables didn't make it. + } catch (const no_such_column_family&) { + // Fall-through + } + if (engine().cpu_id() == 0) { + bookkeeping_ops->push_back(_sys_dist_ks.remove_view(name.first, name.second)); + bookkeeping_ops->push_back(system_keyspace::remove_built_view(name.first, name.second)); + bookkeeping_ops->push_back( + system_keyspace::remove_view_build_progress_across_all_shards( + std::move(name.first), + std::move(name.second))); + } + return view_ptr(nullptr); + }; + + auto built_views = boost::copy_range>(built + | boost::adaptors::transformed(maybe_fetch_view) + | boost::adaptors::filtered([] (const view_ptr& v) { return bool(v); }) + | boost::adaptors::transformed([] (const view_ptr& v) { return v->id(); })); + + std::vector> view_build_status_per_shard; + for (auto& [view_name, first_token, next_token_opt, cpu_id] : in_progress) { + if (auto view = maybe_fetch_view(view_name)) { + if (built_views.find(view->id()) != built_views.end()) { + if (engine().cpu_id() == 0) { + auto f = _sys_dist_ks.finish_view_build(std::move(view_name.first), std::move(view_name.second)).then([view = std::move(view)] { + system_keyspace::remove_view_build_progress_across_all_shards(view->cf_name(), view->ks_name()); + }); + bookkeeping_ops->push_back(std::move(f)); + } + continue; + } + view_build_status_per_shard.resize(std::max(view_build_status_per_shard.size(), size_t(cpu_id + 1))); + view_build_status_per_shard[cpu_id].emplace_back(view_build_status{ + std::move(view), + std::move(first_token), + std::move(next_token_opt)}); + } + } + + std::unordered_set loaded_views; + if (view_build_status_per_shard.size() != smp::count) { + reshard(std::move(view_build_status_per_shard), loaded_views); + } else if (!view_build_status_per_shard.empty()) { + for (auto& status : view_build_status_per_shard[engine().cpu_id()]) { + load_view_status(std::move(status), loaded_views); + } + } + + for (auto& [_, build_step] : _base_to_build_step) { + boost::sort(build_step.build_status, [] (view_build_status s1, view_build_status s2) { + return *s1.next_token < *s2.next_token; + }); + if (!build_step.build_status.empty()) { + build_step.current_key = dht::decorated_key{*build_step.build_status.front().next_token, partition_key::make_empty()}; + } + } + + auto all_views = _db.get_views(); + auto is_new = [&] (const view_ptr& v) { + return base_table_exists(v) && loaded_views.find(v->id()) == loaded_views.end() + && built_views.find(v->id()) == built_views.end(); + }; + for (auto&& view : all_views | boost::adaptors::filtered(is_new)) { + bookkeeping_ops->push_back(add_new_view(view, get_or_create_build_step(view->view_info()->base_id()))); + } + + for (auto& [_, build_step] : _base_to_build_step) { + initialize_reader_at_current_token(build_step); + } + + auto f = seastar::when_all_succeed(bookkeeping_ops->begin(), bookkeeping_ops->end()); + return f.handle_exception([bookkeeping_ops = std::move(bookkeeping_ops)] (std::exception_ptr ep) { + vlogger.error("Failed to update materialized view bookkeeping ({}), continuing anyway.", ep); + }); +} + +future<> view_builder::add_new_view(view_ptr view, build_step& step) { + vlogger.info0("Building view {}.{}, starting at token {}", view->ks_name(), view->cf_name(), step.current_token()); + step.build_status.emplace(step.build_status.begin(), view_build_status{view, step.current_token(), std::nullopt}); + return when_all_succeed( + system_keyspace::register_view_for_building(view->ks_name(), view->cf_name(), step.current_token()), + _sys_dist_ks.start_view_build(view->ks_name(), view->cf_name())); +} + +future<> view_builder::do_build_step() { + return make_ready_future<>(); +} + } // namespace view } // namespace db diff --git a/db/view/view_builder.hh b/db/view/view_builder.hh new file mode 100644 index 0000000000..f75365f1ed --- /dev/null +++ b/db/view/view_builder.hh @@ -0,0 +1,162 @@ +/* + * Copyright (C) 2018 ScyllaDB + */ + +/* + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Scylla. If not, see . + */ + +#pragma once + +#include "database_fwd.hh" +#include "db/system_keyspace.hh" +#include "db/system_distributed_keyspace.hh" +#include "dht/i_partitioner.hh" +#include "keys.hh" +#include "query-request.hh" +#include "service/migration_listener.hh" +#include "service/migration_manager.hh" +#include "sstables/sstable_set.hh" +#include "utils/exponential_backoff_retry.hh" +#include "utils/serialized_action.hh" +#include "utils/UUID.hh" + +#include +#include +#include +#include +#include + +#include +#include +#include + +namespace db::view { + +/** + * The view_builder is a sharded service responsible for building all defined materialized views. + * This process entails walking over the existing data in a given base table, and using it to + * calculate and insert the respective entries for one or more views. + * + * We employ a flat_mutation_reader for each base table for which we're building views. + * + * View building is necessarily a sharded process. That means that on restart, if the number of shards + * has changed, we need to calculate the most conservative token range that has been built, and build + * the remainder. + * + * Interaction with the system tables: + * - When we start building a view, we add an entry to the scylla_views_builds_in_progress + * system table. If the node restarts at this point, we'll consider these newly inserted + * views as having made no progress, and we'll treat them as new views; + * - When we finish a build step, we update the progress of the views that we built during + * this step by writing the next token to the scylla_views_builds_in_progress table. If + * the node restarts here, we'll start building the views at the token in the next_token column. + * - When we finish building a view, we mark it as completed in the built views system table, and + * remove it from the in-progress system table. Under failure, the following can happen: + * * When we fail to mark the view as built, we'll redo the last step upon node reboot; + * * When we fail to delete the in-progress record, upon reboot we'll remove this record. + * A view is marked as completed only when all shards have finished their share of the work, that is, + * if a view is not built, then all shards will still have an entry in the in-progress system table, + * - A view that a shard finished building, but not all other shards, remains in the in-progress system + * table, with first_token == next_token. + * Interaction with the distributed system table (view_build_status): + * - When we start building a view, we mark the view build as being in-progress; + * - When we finish building a view, we mark the view as being built. Upon failure, + * we ensure that if the view is in the in-progress system table, then it may not + * have been written to this table. We don't load the built views from this table + * when starting. When starting, the following happens: + * * If the view is in the system.built_views table and not the in-progress + * system table, then it will be in view_build_status; + * * If the view is in the system.built_views table and not in this one, it + * will still be in the in-progress system table - we detect this and mark + * it as built in this table too, keeping the invariant; + * * If the view is in this table but not in system.built_views, then it will + * also be in the in-progress system table - we don't detect this and will + * redo the missing step, for simplicity. + */ +class view_builder final { + /** + * Keeps track of the build progress for a particular view. + * When the view is built, next_token == first_token. + */ + struct view_build_status final { + view_ptr view; + dht::token first_token; + std::optional next_token; + }; + + /** + * Keeps track of the build progress for all the views of a particular + * base table. Each execution of the build step comprises a query of + * the base table for the selected range. + * + * We pin the set of sstables that potentially contain data that should be added to a + * view (they are pinned by the flat_mutation_reader). Adding a view v' overwrites the + * set of pinned sstables, regardless of there being another view v'' being built. The + * new set will potentially contain new data already in v'', written as part of the write + * path. We assume this case is rare and optimize for fewer disk space in detriment of + * network bandwidth. + */ + struct build_step final { + // Ensure we pin the column_family. It may happen that all views are removed, + // and that the base table is too before we can detect it. + lw_shared_ptr base; + query::partition_slice pslice; + dht::partition_range prange; + flat_mutation_reader reader{nullptr}; + dht::decorated_key current_key{dht::minimum_token(), partition_key::make_empty()}; + std::vector build_status; + + const dht::token& current_token() const { + return current_key.token(); + } + }; + + using base_to_build_step_type = std::unordered_map; + + database& _db; + db::system_distributed_keyspace& _sys_dist_ks; + base_to_build_step_type _base_to_build_step; + base_to_build_step_type::iterator _current_step = _base_to_build_step.end(); + serialized_action _build_step{std::bind(&view_builder::do_build_step, this)}; + +public: + view_builder(database&, db::system_distributed_keyspace&); + view_builder(view_builder&&) = delete; + + /** + * Loads the state stored in the system tables to resume building the existing views. + * Requires that all views have been loaded from the system tables and are accessible + * through the database, and that the commitlog has been replayed. + */ + future<> start(); + + /** + * Stops the view building process. + */ + future<> stop(); + +private: + build_step& get_or_create_build_step(utils::UUID); + void initialize_reader_at_current_token(build_step&); + void load_view_status(view_build_status, std::unordered_set&); + void reshard(std::vector>, std::unordered_set&); + future<> calculate_shard_build_step(std::vector, std::vector); + future<> add_new_view(view_ptr, build_step&); + future<> do_build_step(); +}; + +} \ No newline at end of file diff --git a/main.cc b/main.cc index 3aca8d2f54..1fe7c2c419 100644 --- a/main.cc +++ b/main.cc @@ -40,6 +40,7 @@ #include "db/commitlog/commitlog.hh" #include "db/hints/manager.hh" #include "db/commitlog/commitlog_replayer.hh" +#include "db/view/view_builder.hh" #include "utils/runtime.hh" #include "utils/file_lock.hh" #include "log.hh" @@ -691,6 +692,11 @@ int main(int ac, char** av) { proxy.invoke_on_all([] (service::storage_proxy& local_proxy) { local_proxy.start_hints_manager(gms::get_local_gossiper().shared_from_this()); }).get(); } + static sharded view_builder; + supervisor::notify("starting the view builder"); + view_builder.start(std::ref(db), std::ref(sys_dist_ks)).get(); + view_builder.invoke_on_all(&db::view::view_builder::start).get(); + supervisor::notify("starting native transport"); service::get_local_storage_service().start_native_transport().get(); if (start_thrift) { @@ -721,6 +727,10 @@ int main(int ac, char** av) { return service::get_local_storage_service().drain_on_shutdown(); }); + engine().at_exit([] { + return view_builder.stop(); + }); + engine().at_exit([&db] { return db.invoke_on_all([](auto& db) { return db.get_compaction_manager().stop(); From 3ffa3b6b5483b7f881d1574baaa714ae953b75d1 Mon Sep 17 00:00:00 2001 From: Duarte Nunes Date: Thu, 22 Feb 2018 11:55:55 +0000 Subject: [PATCH 14/22] service/migration_listener: Add class for view notifications Add a convenience base class for view notifications, which provides a default implementation for all other types of notifications. Signed-off-by: Duarte Nunes --- service/migration_listener.hh | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/service/migration_listener.hh b/service/migration_listener.hh index 1d201ba230..945a0b937f 100644 --- a/service/migration_listener.hh +++ b/service/migration_listener.hh @@ -73,6 +73,26 @@ public: virtual void on_drop_function(const sstring& ks_name, const sstring& function_name) = 0; virtual void on_drop_aggregate(const sstring& ks_name, const sstring& aggregate_name) = 0; virtual void on_drop_view(const sstring& ks_name, const sstring& view_name) = 0; + + class only_view_notifications; +}; + +class migration_listener::only_view_notifications : public migration_listener { + virtual void on_create_keyspace(const sstring& ks_name) { } + virtual void on_create_column_family(const sstring& ks_name, const sstring& cf_name) { } + virtual void on_create_user_type(const sstring& ks_name, const sstring& type_name) { } + virtual void on_create_function(const sstring& ks_name, const sstring& function_name) { } + virtual void on_create_aggregate(const sstring& ks_name, const sstring& aggregate_name) { } + virtual void on_update_keyspace(const sstring& ks_name) { } + virtual void on_update_column_family(const sstring& ks_name, const sstring& cf_name, bool columns_changed) { } + virtual void on_update_user_type(const sstring& ks_name, const sstring& type_name) { } + virtual void on_update_function(const sstring& ks_name, const sstring& function_name) { } + virtual void on_update_aggregate(const sstring& ks_name, const sstring& aggregate_name) { } + virtual void on_drop_keyspace(const sstring& ks_name) { } + virtual void on_drop_column_family(const sstring& ks_name, const sstring& cf_name) { } + virtual void on_drop_user_type(const sstring& ks_name, const sstring& type_name) { } + virtual void on_drop_function(const sstring& ks_name, const sstring& function_name) { } + virtual void on_drop_aggregate(const sstring& ks_name, const sstring& aggregate_name) { } }; } From a21efeffa04a27f8269a1006e2207d4c29e5610e Mon Sep 17 00:00:00 2001 From: Duarte Nunes Date: Mon, 12 Mar 2018 20:58:55 +0000 Subject: [PATCH 15/22] db/view/view_builder: React to schema changes The view_builder now uses the migration_manager to subscribe to schema change events, and update its bookkeeping accordingly. We prefer this to having the database call into the view_builder, as that would create a cyclic dependency. We serialize changes to the views of a particular base table, such that schema changes do not interfere with the upcoming view building code. Signed-off-by: Duarte Nunes --- db/view/view.cc | 93 +++++++++++++++++++++++++++++++++++++++-- db/view/view_builder.hh | 14 ++++++- main.cc | 2 +- 3 files changed, 103 insertions(+), 6 deletions(-) diff --git a/db/view/view.cc b/db/view/view.cc index ef6bcdeeed..2d6332ab46 100644 --- a/db/view/view.cc +++ b/db/view/view.cc @@ -915,9 +915,10 @@ future<> mutate_MV(const dht::token& base_token, return f.finally([fs = std::move(fs)] { }); } -view_builder::view_builder(database& db, db::system_distributed_keyspace& sys_dist_ks) +view_builder::view_builder(database& db, db::system_distributed_keyspace& sys_dist_ks, service::migration_manager& mm) : _db(db) - , _sys_dist_ks(sys_dist_ks) { + , _sys_dist_ks(sys_dist_ks) + , _mm(mm) { } future<> view_builder::start() { @@ -925,6 +926,7 @@ future<> view_builder::start() { auto built = system_keyspace::load_built_views().get0(); auto in_progress = system_keyspace::load_view_build_progress().get0(); calculate_shard_build_step(std::move(built), std::move(in_progress)).get(); + _mm.register_listener(this); _current_step = _base_to_build_step.begin(); _build_step.trigger(); }); @@ -932,7 +934,12 @@ future<> view_builder::start() { future<> view_builder::stop() { vlogger.info("Stopping view builder"); - return _build_step.join(); + _mm.unregister_listener(this); + _as.request_abort(); + return _sem.wait().then([this] { + _sem.broken(); + return _build_step.join(); + }); } static query::partition_slice make_partition_slice(const schema& s) { @@ -1166,6 +1173,86 @@ future<> view_builder::add_new_view(view_ptr view, build_step& step) { _sys_dist_ks.start_view_build(view->ks_name(), view->cf_name())); } +static future<> flush_base(lw_shared_ptr base, abort_source& as) { + struct empty_state { }; + return exponential_backoff_retry::do_until_value(1s, 1min, as, [base = std::move(base)] { + return base->flush().then_wrapped([base] (future<> f) -> stdx::optional { + if (f.failed()) { + vlogger.error("Error flushing base table {}.{}: {}; retrying", base->schema()->ks_name(), base->schema()->cf_name(), f.get_exception()); + return { }; + } + return { empty_state{} }; + }); + }).discard_result(); +} + +void view_builder::on_create_view(const sstring& ks_name, const sstring& view_name) { + with_semaphore(_sem, 1, [ks_name, view_name, this] { + auto view = view_ptr(_db.find_schema(ks_name, view_name)); + auto& step = get_or_create_build_step(view->view_info()->base_id()); + return step.base->await_pending_writes().then([this, &step] { + return flush_base(step.base, _as); + }).then([this, view, &step] () mutable { + // This resets the build step to the current token. It may result in views currently + // being built to receive duplicate updates, but it simplifies things as we don't have + // to keep around a list of new views to build the next time the reader crosses a token + // threshold. + initialize_reader_at_current_token(step); + return add_new_view(view, step).then_wrapped([this, view] (future<>&& f) { + if (f.failed()) { + vlogger.error("Error setting up view for building {}.{}: {}", view->ks_name(), view->cf_name(), f.get_exception()); + } + _build_step.trigger(); + }); + }); + }).handle_exception_type([] (no_such_column_family&) { }); +} + +void view_builder::on_update_view(const sstring& ks_name, const sstring& view_name, bool) { + with_semaphore(_sem, 1, [ks_name, view_name, this] { + auto view = view_ptr(_db.find_schema(ks_name, view_name)); + auto step_it = _base_to_build_step.find(view->view_info()->base_id()); + if (step_it == _base_to_build_step.end()) { + return;// In case all the views for this CF have finished building already. + } + auto status_it = boost::find_if(step_it->second.build_status, [view] (const view_build_status& bs) { + return bs.view->id() == view->id(); + }); + if (status_it != step_it->second.build_status.end()) { + status_it->view = std::move(view); + } + }).handle_exception_type([] (no_such_column_family&) { }); +} + +void view_builder::on_drop_view(const sstring& ks_name, const sstring& view_name) { + vlogger.info0("Stopping to build view {}.{}", ks_name, view_name); + with_semaphore(_sem, 1, [ks_name, view_name, this] { + // The view is absent from the database at this point, so find it by brute force. + ([&, this] { + for (auto& [_, step] : _base_to_build_step) { + if (step.build_status.empty() || step.build_status.front().view->ks_name() != ks_name) { + continue; + } + for (auto it = step.build_status.begin(); it != step.build_status.end(); ++it) { + if (it->view->cf_name() == view_name) { + step.build_status.erase(it); + return; + } + } + } + })(); + if (engine().cpu_id() != 0) { + return make_ready_future(); + } + return when_all_succeed( + system_keyspace::remove_view_build_progress_across_all_shards(ks_name, view_name), + system_keyspace::remove_built_view(ks_name, view_name), + _sys_dist_ks.remove_view(ks_name, view_name)).handle_exception([ks_name, view_name] (std::exception_ptr ep) { + vlogger.warn("Failed to cleanup view {}.{}: {}", ks_name, view_name, ep); + }); + }); +} + future<> view_builder::do_build_step() { return make_ready_future<>(); } diff --git a/db/view/view_builder.hh b/db/view/view_builder.hh index f75365f1ed..15b7ff344c 100644 --- a/db/view/view_builder.hh +++ b/db/view/view_builder.hh @@ -87,7 +87,7 @@ namespace db::view { * also be in the in-progress system table - we don't detect this and will * redo the missing step, for simplicity. */ -class view_builder final { +class view_builder final : public service::migration_listener::only_view_notifications { /** * Keeps track of the build progress for a particular view. * When the view is built, next_token == first_token. @@ -129,12 +129,18 @@ class view_builder final { database& _db; db::system_distributed_keyspace& _sys_dist_ks; + service::migration_manager& _mm; base_to_build_step_type _base_to_build_step; base_to_build_step_type::iterator _current_step = _base_to_build_step.end(); serialized_action _build_step{std::bind(&view_builder::do_build_step, this)}; + // Ensures bookkeeping operations are serialized, meaning that while we execute + // a build step we don't consider newly added or removed views. This simplifies + // the algorithms. Also synchronizes an operation wrt. a call to stop(). + seastar::semaphore _sem{1}; + seastar::abort_source _as; public: - view_builder(database&, db::system_distributed_keyspace&); + view_builder(database&, db::system_distributed_keyspace&, service::migration_manager&); view_builder(view_builder&&) = delete; /** @@ -149,6 +155,10 @@ public: */ future<> stop(); + virtual void on_create_view(const sstring& ks_name, const sstring& view_name) override; + virtual void on_update_view(const sstring& ks_name, const sstring& view_name, bool columns_changed) override; + virtual void on_drop_view(const sstring& ks_name, const sstring& view_name) override; + private: build_step& get_or_create_build_step(utils::UUID); void initialize_reader_at_current_token(build_step&); diff --git a/main.cc b/main.cc index 1fe7c2c419..f32ddac636 100644 --- a/main.cc +++ b/main.cc @@ -694,7 +694,7 @@ int main(int ac, char** av) { static sharded view_builder; supervisor::notify("starting the view builder"); - view_builder.start(std::ref(db), std::ref(sys_dist_ks)).get(); + view_builder.start(std::ref(db), std::ref(sys_dist_ks), std::ref(mm)).get(); view_builder.invoke_on_all(&db::view::view_builder::start).get(); supervisor::notify("starting native transport"); From 1f3e3d3813c2866ab97472ecb27628d366a4859c Mon Sep 17 00:00:00 2001 From: Duarte Nunes Date: Wed, 14 Mar 2018 14:50:07 +0000 Subject: [PATCH 16/22] flat_mutation_reader: Make reader from mutation fragments Builds a reader from a set of ordered mutations fragments. This is useful for building a reader out of a subset of segments returned by a different reader. It is equivalent to building a mutation out of the set of mutation fragments, and calling make_flat_mutation_reader_from_mutations, except that it doest not yet support fast-forwarding. Signed-off-by: Duarte Nunes --- flat_mutation_reader.cc | 34 ++++++++++++++++++++++++++++++++++ flat_mutation_reader.hh | 5 +++++ 2 files changed, 39 insertions(+) diff --git a/flat_mutation_reader.cc b/flat_mutation_reader.cc index 2d30b4d411..67b648ec80 100644 --- a/flat_mutation_reader.cc +++ b/flat_mutation_reader.cc @@ -621,3 +621,37 @@ make_flat_multi_range_reader(schema_ptr s, mutation_source source, const dht::pa return make_flat_mutation_reader(std::move(s), std::move(source), ranges, slice, pc, std::move(trace_state), fwd_mr); } + +flat_mutation_reader +make_flat_mutation_reader_from_fragments(schema_ptr schema, std::deque fragments) { + class reader : public flat_mutation_reader::impl { + std::deque _fragments; + public: + reader(schema_ptr schema, std::deque fragments) + : flat_mutation_reader::impl(std::move(schema)) + , _fragments(std::move(fragments)) { + } + virtual future<> fill_buffer(db::timeout_clock::time_point) override { + while (!(_end_of_stream = _fragments.empty()) && !is_buffer_full()) { + push_mutation_fragment(std::move(_fragments.front())); + _fragments.pop_front(); + } + return make_ready_future<>(); + } + virtual void next_partition() override { + clear_buffer_to_next_partition(); + if (is_buffer_empty()) { + while (!(_end_of_stream = _fragments.empty()) && !_fragments.front().is_partition_start()) { + _fragments.pop_front(); + } + } + } + virtual future<> fast_forward_to(position_range pr, db::timeout_clock::time_point timeout) override { + throw std::runtime_error("This reader can't be fast forwarded to another range."); + } + virtual future<> fast_forward_to(const dht::partition_range& pr, db::timeout_clock::time_point timeout) override { + throw std::runtime_error("This reader can't be fast forwarded to another position."); + } + }; + return make_flat_mutation_reader(std::move(schema), std::move(fragments)); +} diff --git a/flat_mutation_reader.hh b/flat_mutation_reader.hh index 3aafedf71a..fc62086e2a 100644 --- a/flat_mutation_reader.hh +++ b/flat_mutation_reader.hh @@ -33,6 +33,8 @@ #include #include "db/timeout_clock.hh" +#include + using seastar::future; class mutation_source; @@ -557,6 +559,9 @@ make_flat_multi_range_reader(schema_ptr s, mutation_source source, const dht::pa tracing::trace_state_ptr trace_state = nullptr, flat_mutation_reader::partition_range_forwarding fwd_mr = flat_mutation_reader::partition_range_forwarding::yes); +flat_mutation_reader +make_flat_mutation_reader_from_fragments(schema_ptr, std::deque); + // Calls the consumer for each element of the reader's stream until end of stream // is reached or the consumer requests iteration to stop by returning stop_iteration::yes. // The consumer should accept mutation as the argument and return stop_iteration. From 5f822e3928f9861b7040700e2214adca5744a3cd Mon Sep 17 00:00:00 2001 From: Duarte Nunes Date: Mon, 12 Mar 2018 21:01:15 +0000 Subject: [PATCH 17/22] db/view/view_builder: Actually build views This patch adds the missing view building code to the eponymous class. We consume from the reader associated with each base table until all its views are built. If the reader reaches the end and there are incomplete views, then a view was added while others were being built. In such cases, we restart the reader to the beginning of the current token, but not to the beginning of the token range, when the view is added. Then, when we exhaust the reader, we simply create a new one for the whole token range, and resume building the pending views. We aim to be resource-conscious. On a given shard, at any given moment, we consume at most from one reader. We also strive for fairness, in that each build step inserts entries for the views of a different base. Each build step reads and generates updates for batch_size rows. We lack a controller, which could potentially allow us to go faster (to execute multiple steps at the same time, or consume more rows per batch), and also which would apply backpressure, so we could, for example, delay executing a build step. Signed-off-by: Duarte Nunes --- db/view/view.cc | 231 +++++++++++++++++++++++++++++++++++++++- db/view/view_builder.hh | 19 +++- keys.hh | 4 + 3 files changed, 251 insertions(+), 3 deletions(-) diff --git a/db/view/view.cc b/db/view/view.cc index 2d6332ab46..cbd99bc519 100644 --- a/db/view/view.cc +++ b/db/view/view.cc @@ -39,6 +39,7 @@ * along with Scylla. If not, see . */ +#include #include #include #include @@ -60,6 +61,8 @@ #include "gms/inet_address.hh" #include "keys.hh" #include "locator/network_topology_strategy.hh" +#include "mutation.hh" +#include "mutation_partition.hh" #include "service/migration_manager.hh" #include "service/storage_service.hh" #include "view_info.hh" @@ -1235,6 +1238,7 @@ void view_builder::on_drop_view(const sstring& ks_name, const sstring& view_name } for (auto it = step.build_status.begin(); it != step.build_status.end(); ++it) { if (it->view->cf_name() == view_name) { + _built_views.erase(it->view->id()); step.build_status.erase(it); return; } @@ -1254,9 +1258,232 @@ void view_builder::on_drop_view(const sstring& ks_name, const sstring& view_name } future<> view_builder::do_build_step() { - return make_ready_future<>(); + return seastar::async([this] { + exponential_backoff_retry r(1s, 1min); + while (!_base_to_build_step.empty() && !_as.abort_requested()) { + auto units = get_units(_sem, 1).get0(); + try { + execute(_current_step->second, exponential_backoff_retry(1s, 1min)); + r.reset(); + } catch (const abort_requested_exception&) { + return; + } catch (...) { + auto base = _current_step->second.base->schema(); + vlogger.warn("Error executing build step for base {}.{}: {}", base->ks_name(), base->cf_name(), std::current_exception()); + r.retry(_as).get(); + initialize_reader_at_current_token(_current_step->second); + } + if (_current_step->second.build_status.empty()) { + _current_step = _base_to_build_step.erase(_current_step); + } else { + ++_current_step; + } + if (_current_step == _base_to_build_step.end()) { + _current_step = _base_to_build_step.begin(); + } + } + }); +} + +// Called in the context of a seastar::thread. +class view_builder::consumer { +public: + struct built_views { + build_step& step; + std::vector views; + + built_views(build_step& step) + : step(step) { + } + + built_views(built_views&& other) + : step(other.step) + , views(std::move(other.views)) { + } + + ~built_views() { + for (auto&& status : views) { + std::cout << "putting " << status.view->cf_name() << " back\n"; + // Use step.current_token(), which may have wrapped around and become < first_token. + step.build_status.emplace_back(view_build_status{std::move(status.view), step.current_token(), step.current_token()}); + } + } + + void release() { + views.clear(); + } + }; + +private: + view_builder& _builder; + build_step& _step; + built_views _built_views; + std::vector _views_to_build; + std::deque _fragments; + +public: + consumer(view_builder& builder, build_step& step) + : _builder(builder) + , _step(step) + , _built_views{step} { + if (!step.current_key.key().is_empty(*_step.reader.schema())) { + load_views_to_build(); + } + } + + void load_views_to_build() { + for (auto&& vs : _step.build_status) { + if (_step.current_token() >= vs.next_token) { + if (partition_key_matches(*_step.reader.schema(), *vs.view->view_info(), _step.current_key)) { + _views_to_build.push_back(vs.view); + } + if (vs.next_token || _step.current_token() != vs.first_token) { + vs.next_token = _step.current_key.token(); + } + } else { + break; + } + } + } + + void check_for_built_views() { + for (auto it = _step.build_status.begin(); it != _step.build_status.end();) { + // A view starts being built at token t1. Due to resharding, that may not necessarily be a + // shard-owned token. We finish building the view when the next_token to build is just before + // (or at) the first token, but the shard-owned current token is after (or at) the first token. + // In the system tables, we set first_token = next_token to signal the completion of the build + // process in case of a restart. + if (it->next_token && *it->next_token <= it->first_token && _step.current_token() >= it->first_token) { + _built_views.views.push_back(std::move(*it)); + it = _step.build_status.erase(it); + } else { + ++it; + } + } + } + + stop_iteration consume_new_partition(const dht::decorated_key& dk) { + _step.current_key = std::move(dk); + check_for_built_views(); + _views_to_build.clear(); + load_views_to_build(); + return stop_iteration(_views_to_build.empty()); + } + + stop_iteration consume(tombstone) { + return stop_iteration::no; + } + + stop_iteration consume(static_row&&, tombstone, bool) { + return stop_iteration::no; + } + + stop_iteration consume(clustering_row&& cr, row_tombstone, bool) { + if (_views_to_build.empty() || _builder._as.abort_requested()) { + return stop_iteration::yes; + } + + _fragments.push_back(std::move(cr)); + return stop_iteration::no; + } + + stop_iteration consume(range_tombstone&&) { + return stop_iteration::no; + } + + stop_iteration consume_end_of_partition() { + _builder._as.check(); + if (!_fragments.empty()) { + _fragments.push_front(partition_start(_step.current_key, tombstone())); + _step.base->populate_views( + _views_to_build, + _step.current_token(), + make_flat_mutation_reader_from_fragments(_step.base->schema(), std::move(_fragments))).get(); + _fragments.clear(); + } + return stop_iteration(_step.build_status.empty()); + } + + built_views consume_end_of_stream() { + if (vlogger.is_enabled(log_level::debug)) { + auto view_names = boost::copy_range>( + _views_to_build | boost::adaptors::transformed([](auto v) { + return v->cf_name(); + })); + vlogger.debug("Completed build step for base {}.{}, at token {}; views={}", _step.base->schema()->ks_name(), + _step.base->schema()->cf_name(), _step.current_token(), view_names); + } + if (_step.reader.is_end_of_stream() && _step.reader.is_buffer_empty()) { + _step.current_key = {dht::minimum_token(), partition_key::make_empty()}; + for (auto&& vs : _step.build_status) { + vs.next_token = dht::minimum_token(); + } + _builder.initialize_reader_at_current_token(_step); + check_for_built_views(); + } + return std::move(_built_views); + } +}; + +// Called in the context of a seastar::thread. +void view_builder::execute(build_step& step, exponential_backoff_retry r) { + auto consumer = compact_for_query( + *step.reader.schema(), + gc_clock::now(), + step.pslice, + batch_size, + query::max_partitions, + view_builder::consumer{*this, step}); + consumer.consume_new_partition(step.current_key); // Initialize the state in case we're resuming a partition + auto built = step.reader.consume_in_thread(std::move(consumer)); + + _as.check(); + + std::vector> bookkeeping_ops; + bookkeeping_ops.reserve(built.views.size() + step.build_status.size()); + for (auto& [view, first_token, _] : built.views) { + bookkeeping_ops.push_back(maybe_mark_view_as_built(view, first_token)); + } + built.release(); + for (auto& [view, _, next_token] : step.build_status) { + if (next_token) { + bookkeeping_ops.push_back( + system_keyspace::update_view_build_progress(view->ks_name(), view->cf_name(), *next_token)); + } + } + seastar::when_all_succeed(bookkeeping_ops.begin(), bookkeeping_ops.end()).handle_exception([] (std::exception_ptr ep) { + vlogger.error("Failed to update materialized view bookkeeping ({}), continuing anyway.", ep); + }).get(); +} + +future<> view_builder::maybe_mark_view_as_built(view_ptr view, dht::token next_token) { + _built_views.emplace(view->id()); + vlogger.debug("Shard finished building view {}.{}", view->ks_name(), view->cf_name()); + return container().map_reduce0( + [view_id = view->id()] (view_builder& builder) { + return builder._built_views.count(view_id); + }, + true, + [] (bool result, bool shard_complete) { + return result & shard_complete; + }).then([this, view, next_token = std::move(next_token)] (bool built) { + if (built) { + return container().invoke_on_all([view_id = view->id()] (view_builder& builder) { + if (builder._built_views.erase(view_id) == 0 || engine().cpu_id() != 0) { + return make_ready_future<>(); + } + auto view = builder._db.find_schema(view_id); + vlogger.info("Finished building view {}.{}", view->ks_name(), view->cf_name()); + return seastar::when_all_succeed( + system_keyspace::mark_view_as_built(view->ks_name(), view->cf_name()), + builder._sys_dist_ks.finish_view_build(view->ks_name(), view->cf_name())).then([view] { + return system_keyspace::remove_view_build_progress_across_all_shards(view->ks_name(), view->cf_name()); + }); + }); + } + return system_keyspace::update_view_build_progress(view->ks_name(), view->cf_name(), next_token); + }); } } // namespace view } // namespace db - diff --git a/db/view/view_builder.hh b/db/view/view_builder.hh index 15b7ff344c..959a914384 100644 --- a/db/view/view_builder.hh +++ b/db/view/view_builder.hh @@ -53,6 +53,14 @@ namespace db::view { * * We employ a flat_mutation_reader for each base table for which we're building views. * + * We aim to be resource-conscious. On a given shard, at any given moment, we consume at most + * from one reader. We also strive for fairness, in that each build step inserts entries for + * the views of a different base. Each build step reads and generates updates for batch_size rows. + * + * We lack a controller, which could potentially allow us to go faster (to execute multiple steps at + * the same time, or consume more rows per batch), and also which would apply backpressure, so we + * could, for example, delay executing a build step. + * * View building is necessarily a sharded process. That means that on restart, if the number of shards * has changed, we need to calculate the most conservative token range that has been built, and build * the remainder. @@ -87,7 +95,7 @@ namespace db::view { * also be in the in-progress system table - we don't detect this and will * redo the missing step, for simplicity. */ -class view_builder final : public service::migration_listener::only_view_notifications { +class view_builder final : public service::migration_listener::only_view_notifications, public seastar::peering_sharded_service { /** * Keeps track of the build progress for a particular view. * When the view is built, next_token == first_token. @@ -138,6 +146,11 @@ class view_builder final : public service::migration_listener::only_view_notific // the algorithms. Also synchronizes an operation wrt. a call to stop(). seastar::semaphore _sem{1}; seastar::abort_source _as; + // Used to coordinate between shards the conclusion of the build process for a particular view. + std::unordered_set _built_views; + +public: + static constexpr size_t batch_size = 128; public: view_builder(database&, db::system_distributed_keyspace&, service::migration_manager&); @@ -167,6 +180,10 @@ private: future<> calculate_shard_build_step(std::vector, std::vector); future<> add_new_view(view_ptr, build_step&); future<> do_build_step(); + void execute(build_step&, exponential_backoff_retry); + future<> maybe_mark_view_as_built(view_ptr, dht::token); + + struct consumer; }; } \ No newline at end of file diff --git a/keys.hh b/keys.hh index 81f112c7e1..9fce736d59 100644 --- a/keys.hh +++ b/keys.hh @@ -304,6 +304,10 @@ public: return get_compound_type(s)->end(_bytes); } + bool is_empty(const schema& s) const { + return begin(s) == end(s); + } + // Returns a range of bytes_view auto components() const { return TopLevelView::compound::element_type::components(representation()); From a45fa8eaa2051b4faa84196257f3bc3271fea658 Mon Sep 17 00:00:00 2001 From: Duarte Nunes Date: Wed, 14 Mar 2018 15:11:49 +0000 Subject: [PATCH 18/22] db/view/view_builder: Allow synchronizing with the end of a build Intended for use by unit tests, this patch allows synchronizing with the end of a build for a particular view. Signed-off-by: Duarte Nunes --- db/view/view.cc | 12 ++++++++++++ db/view/view_builder.hh | 7 +++++++ 2 files changed, 19 insertions(+) diff --git a/db/view/view.cc b/db/view/view.cc index cbd99bc519..5b3e135c08 100644 --- a/db/view/view.cc +++ b/db/view/view.cc @@ -1478,6 +1478,11 @@ future<> view_builder::maybe_mark_view_as_built(view_ptr view, dht::token next_t system_keyspace::mark_view_as_built(view->ks_name(), view->cf_name()), builder._sys_dist_ks.finish_view_build(view->ks_name(), view->cf_name())).then([view] { return system_keyspace::remove_view_build_progress_across_all_shards(view->ks_name(), view->cf_name()); + }).then([&builder, view] { + auto it = builder._build_notifiers.find(std::pair(view->ks_name(), view->cf_name())); + if (it != builder._build_notifiers.end()) { + it->second.set_value(); + } }); }); } @@ -1485,5 +1490,12 @@ future<> view_builder::maybe_mark_view_as_built(view_ptr view, dht::token next_t }); } +future<> view_builder::wait_until_built(const sstring& ks_name, const sstring& view_name, lowres_clock::time_point timeout) { + return container().invoke_on(0, [ks_name, view_name, timeout] (view_builder& builder) { + auto v = std::pair(std::move(ks_name), std::move(view_name)); + return builder._build_notifiers[std::move(v)].get_shared_future(timeout); + }); +} + } // namespace view } // namespace db diff --git a/db/view/view_builder.hh b/db/view/view_builder.hh index 959a914384..18d7780327 100644 --- a/db/view/view_builder.hh +++ b/db/view/view_builder.hh @@ -36,8 +36,10 @@ #include #include +#include #include #include +#include #include #include @@ -148,6 +150,8 @@ class view_builder final : public service::migration_listener::only_view_notific seastar::abort_source _as; // Used to coordinate between shards the conclusion of the build process for a particular view. std::unordered_set _built_views; + // Used for testing. + std::unordered_map, seastar::shared_promise<>, utils::tuple_hash> _build_notifiers; public: static constexpr size_t batch_size = 128; @@ -172,6 +176,9 @@ public: virtual void on_update_view(const sstring& ks_name, const sstring& view_name, bool columns_changed) override; virtual void on_drop_view(const sstring& ks_name, const sstring& view_name) override; + // For tests + future<> wait_until_built(const sstring& ks_name, const sstring& view_name, lowres_clock::time_point timeout); + private: build_step& get_or_create_build_step(utils::UUID); void initialize_reader_at_current_token(build_step&); From a2c94e7925bbdce6fd53acb057306cb28439e7ee Mon Sep 17 00:00:00 2001 From: Duarte Nunes Date: Tue, 13 Mar 2018 00:45:20 +0000 Subject: [PATCH 19/22] tests/cql_test_env: Start the view_builder Signed-off-by: Duarte Nunes --- tests/cql_test_env.cc | 23 +++++++++++++++++++++-- tests/cql_test_env.hh | 6 ++++++ 2 files changed, 27 insertions(+), 2 deletions(-) diff --git a/tests/cql_test_env.cc b/tests/cql_test_env.cc index fad14ee8c0..0b152d2860 100644 --- a/tests/cql_test_env.cc +++ b/tests/cql_test_env.cc @@ -39,6 +39,7 @@ #include "tmpdir.hh" #include "db/query_context.hh" #include "test_services.hh" +#include "db/view/view_builder.hh" // TODO: remove (#293) #include "message/messaging_service.hh" @@ -89,6 +90,7 @@ public: private: ::shared_ptr> _db; ::shared_ptr> _auth_service; + ::shared_ptr> _view_builder; lw_shared_ptr _data_dir; private: struct core_local_state { @@ -113,7 +115,13 @@ private: return ::make_shared(_core_local.local().client_state); } public: - single_node_cql_env(::shared_ptr> db, ::shared_ptr> auth_service) : _db(db), _auth_service(std::move(auth_service)) + single_node_cql_env( + ::shared_ptr> db, + ::shared_ptr> auth_service, + ::shared_ptr> view_builder) + : _db(db) + , _auth_service(std::move(auth_service)) + , _view_builder(std::move(view_builder)) { } virtual future<::shared_ptr> execute_cql(const sstring& text) override { @@ -256,6 +264,10 @@ public: return _auth_service->local(); } + virtual db::view::view_builder& local_view_builder() override { + return _view_builder->local(); + } + future<> start() { return _core_local.start(std::ref(*_auth_service)); } @@ -372,6 +384,13 @@ public: auth_service->stop().get(); }); + auto view_builder = ::make_shared>(); + view_builder->start(std::ref(*db), std::ref(sys_dist_ks), std::ref(mm)).get(); + view_builder->invoke_on_all(&db::view::view_builder::start).get(); + auto stop_view_builder = defer([view_builder] { + view_builder->stop().get(); + }); + // Create the testing user. try { auth::role_config config; @@ -387,7 +406,7 @@ public: // The default user may already exist if this `cql_test_env` is starting with previously populated data. } - single_node_cql_env env(db, auth_service); + single_node_cql_env env(db, auth_service, view_builder); env.start().get(); auto stop_env = defer([&env] { env.stop().get(); }); diff --git a/tests/cql_test_env.hh b/tests/cql_test_env.hh index ebe5236852..b3136aeff0 100644 --- a/tests/cql_test_env.hh +++ b/tests/cql_test_env.hh @@ -38,6 +38,10 @@ class database; +namespace db::view { +class view_builder; +} + namespace auth { class service; } @@ -95,6 +99,8 @@ public: virtual distributed & qp() = 0; virtual auth::service& local_auth_service() = 0; + + virtual db::view::view_builder& local_view_builder() = 0; }; future<> do_with_cql_env(std::function(cql_test_env&)> func); From 85285840566ff9e2667c0eda2de1a0818a6bdfd1 Mon Sep 17 00:00:00 2001 From: Duarte Nunes Date: Sun, 18 Mar 2018 10:06:27 +0000 Subject: [PATCH 20/22] tests/cql_assertions: Assert result set is not empty Signed-off-by: Duarte Nunes --- tests/cql_assertions.cc | 9 +++++++++ tests/cql_assertions.hh | 1 + 2 files changed, 10 insertions(+) diff --git a/tests/cql_assertions.cc b/tests/cql_assertions.cc index 9c9ba409ca..c2f68f679f 100644 --- a/tests/cql_assertions.cc +++ b/tests/cql_assertions.cc @@ -54,6 +54,15 @@ rows_assertions::is_empty() { return {*this}; } +rows_assertions +rows_assertions::is_not_empty() { + auto row_count = _rows->rs().size(); + if (row_count == 0) { + fail("Expected some rows, but was result was empty"); + } + return {*this}; +} + rows_assertions rows_assertions::with_row(std::initializer_list values) { std::vector expected_row(values); diff --git a/tests/cql_assertions.hh b/tests/cql_assertions.hh index 79c6b73ced..26192df6ef 100644 --- a/tests/cql_assertions.hh +++ b/tests/cql_assertions.hh @@ -33,6 +33,7 @@ public: rows_assertions(shared_ptr rows); rows_assertions with_size(size_t size); rows_assertions is_empty(); + rows_assertions is_not_empty(); rows_assertions with_row(std::initializer_list values); // Verifies that the result has the following rows and only that rows, in that order. From e5031f70efe4cbf4ced871347b981787e5edba04 Mon Sep 17 00:00:00 2001 From: Duarte Nunes Date: Thu, 22 Mar 2018 18:36:53 +0000 Subject: [PATCH 21/22] tests/cql_test_env: Move eventually() to this file Move eventually() from view_schema_test to cql_test_env. Signed-off-by: Duarte Nunes --- tests/cql_test_env.hh | 17 +++++++++++++++++ tests/view_schema_test.cc | 18 ------------------ 2 files changed, 17 insertions(+), 18 deletions(-) diff --git a/tests/cql_test_env.hh b/tests/cql_test_env.hh index b3136aeff0..173e6c5e2d 100644 --- a/tests/cql_test_env.hh +++ b/tests/cql_test_env.hh @@ -107,3 +107,20 @@ future<> do_with_cql_env(std::function(cql_test_env&)> func); future<> do_with_cql_env(std::function(cql_test_env&)> func, const db::config&); future<> do_with_cql_env_thread(std::function func); future<> do_with_cql_env_thread(std::function func, const db::config&); + +template +static void eventually(EventuallySucceedingFunction&& f, size_t max_attempts = 10) { + size_t attempts = 0; + while (true) { + try { + f(); + break; + } catch (...) { + if (++attempts < max_attempts) { + sleep(std::chrono::milliseconds(1 << attempts)).get0(); + } else { + throw; + } + } + } +} \ No newline at end of file diff --git a/tests/view_schema_test.cc b/tests/view_schema_test.cc index a4aaab989a..b9b67cd4a3 100644 --- a/tests/view_schema_test.cc +++ b/tests/view_schema_test.cc @@ -33,24 +33,6 @@ using namespace std::literals::chrono_literals; -template -static void eventually(EventuallySucceedingFunction&& f) { - constexpr unsigned max_attempts = 10; - unsigned attempts = 0; - while (true) { - try { - f(); - break; - } catch (...) { - if (++attempts < max_attempts) { - sleep(std::chrono::milliseconds(1 << attempts)).get0(); - } else { - throw; - } - } - } -} - SEASTAR_TEST_CASE(test_case_sensitivity) { return do_with_cql_env_thread([] (auto& e) { e.execute_cql("create table cf (theKey int, theClustering int, theValue int, primary key (theKey, theClustering));").get(); From 9f5cfa76f7a8aa84c5d637ff69d0f79484a01ccb Mon Sep 17 00:00:00 2001 From: Duarte Nunes Date: Tue, 13 Mar 2018 00:46:14 +0000 Subject: [PATCH 22/22] tests/view_build_test: Add tests for view building This is a separate file from view_schema_test because that one is already becoming too long to run; also, having multiple test files means they can be executed in parallel. Signed-off-by: Duarte Nunes --- configure.py | 1 + test.py | 1 + tests/view_build_test.cc | 342 +++++++++++++++++++++++++++++++++++++++ 3 files changed, 344 insertions(+) create mode 100644 tests/view_build_test.cc diff --git a/configure.py b/configure.py index 9ee18f1549..593783a93a 100755 --- a/configure.py +++ b/configure.py @@ -273,6 +273,7 @@ scylla_tests = [ 'tests/input_stream_test', 'tests/virtual_reader_test', 'tests/view_schema_test', + 'tests/view_build_test', 'tests/counter_test', 'tests/cell_locker_test', 'tests/row_locker_test', diff --git a/test.py b/test.py index d5a88c181b..5cf316641f 100755 --- a/test.py +++ b/test.py @@ -88,6 +88,7 @@ boost_tests = [ 'counter_test', 'cell_locker_test', 'view_schema_test', + 'view_build_test', 'clustering_ranges_walker_test', 'vint_serialization_test', 'duration_test', diff --git a/tests/view_build_test.cc b/tests/view_build_test.cc new file mode 100644 index 0000000000..1489231eb1 --- /dev/null +++ b/tests/view_build_test.cc @@ -0,0 +1,342 @@ +/* + * Copyright (C) 2018 ScyllaDB + */ + +/* + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Scylla. If not, see . + */ + +#include + +#include "database.hh" +#include "db/view/view_builder.hh" +#include "db/system_keyspace.hh" + +#include "tests/test-utils.hh" +#include "tests/cql_test_env.hh" +#include "tests/cql_assertions.hh" + +using namespace std::literals::chrono_literals; + +SEASTAR_TEST_CASE(test_builder_with_large_partition) { + return do_with_cql_env_thread([] (cql_test_env& e) { + e.execute_cql("create table cf (p int, c int, v int, primary key (p, c))").get(); + + for (auto i = 0; i < 1024; ++i) { + e.execute_cql(sprint("insert into cf (p, c, v) values (0, %d, 0)", i)).get(); + } + + auto f = e.local_view_builder().wait_until_built("ks", "vcf", lowres_clock::now() + 10s); + e.execute_cql("create materialized view vcf as select * from cf " + "where p is not null and c is not null and v is not null " + "primary key (v, c, p)").get(); + + f.get(); + auto built = db::system_keyspace::load_built_views().get0(); + BOOST_REQUIRE_EQUAL(built.size(), 1); + BOOST_REQUIRE_EQUAL(built[0].second, sstring("vcf")); + + auto msg = e.execute_cql("select count(*) from vcf where v = 0").get0(); + assert_that(msg).is_rows().with_size(1); + assert_that(msg).is_rows().with_rows({{{long_type->decompose(1024L)}}}); + }); +} + +SEASTAR_TEST_CASE(test_builder_with_multiple_partitions) { + return do_with_cql_env_thread([] (cql_test_env& e) { + e.execute_cql("create table cf (p int, c int, v int, primary key (p, c))").get(); + + for (auto i = 0; i < 1024; ++i) { + e.execute_cql(sprint("insert into cf (p, c, v) values (%d, %d, 0)", i % 5, i)).get(); + } + + auto f = e.local_view_builder().wait_until_built("ks", "vcf", lowres_clock::now() + 10s); + e.execute_cql("create materialized view vcf as select * from cf " + "where p is not null and c is not null and v is not null " + "primary key (v, c, p)").get(); + + f.get(); + auto built = db::system_keyspace::load_built_views().get0(); + BOOST_REQUIRE_EQUAL(built.size(), 1); + BOOST_REQUIRE_EQUAL(built[0].second, sstring("vcf")); + + auto msg = e.execute_cql("select count(*) from vcf where v = 0").get0(); + assert_that(msg).is_rows().with_size(1); + assert_that(msg).is_rows().with_rows({{{long_type->decompose(1024L)}}}); + }); +} + +SEASTAR_TEST_CASE(test_builder_with_multiple_partitions_of_batch_size_rows) { + return do_with_cql_env_thread([] (cql_test_env& e) { + e.execute_cql("create table cf (p int, c int, v int, primary key (p, c))").get(); + + for (auto i = 0; i < 1024; ++i) { + e.execute_cql(sprint("insert into cf (p, c, v) values (%d, %d, 0)", i % db::view::view_builder::batch_size, i)).get(); + } + + auto f = e.local_view_builder().wait_until_built("ks", "vcf", lowres_clock::now() + 10s); + e.execute_cql("create materialized view vcf as select * from cf " + "where p is not null and c is not null and v is not null " + "primary key (v, c, p)").get(); + + f.get(); + auto built = db::system_keyspace::load_built_views().get0(); + BOOST_REQUIRE_EQUAL(built.size(), 1); + BOOST_REQUIRE_EQUAL(built[0].second, sstring("vcf")); + + auto msg = e.execute_cql("select count(*) from vcf where v = 0").get0(); + assert_that(msg).is_rows().with_size(1); + assert_that(msg).is_rows().with_rows({{{long_type->decompose(1024L)}}}); + }); +} + +SEASTAR_TEST_CASE(test_builder_view_added_during_ongoing_build) { + return do_with_cql_env_thread([] (cql_test_env& e) { + e.execute_cql("create table cf (p int, c int, v int, primary key (p, c))").get(); + + for (auto i = 0; i < 5000; ++i) { + e.execute_cql(sprint("insert into cf (p, c, v) values (0, %d, 0)", i)).get(); + } + + auto f1 = e.local_view_builder().wait_until_built("ks", "vcf1", lowres_clock::now() + 60s); + auto f2 = e.local_view_builder().wait_until_built("ks", "vcf2", lowres_clock::now() + 30s); + + e.execute_cql("create materialized view vcf1 as select * from cf " + "where p is not null and c is not null and v is not null " + "primary key (v, c, p)").get(); + + sleep(1s).get(); + + e.execute_cql("create materialized view vcf2 as select * from cf " + "where p is not null and c is not null and v is not null " + "primary key (v, p, c)").get(); + + f2.get(); + f1.get(); + auto built = db::system_keyspace::load_built_views().get0(); + BOOST_REQUIRE_EQUAL(built.size(), 2); + + auto msg = e.execute_cql("select count(*) from vcf1 where v = 0").get0(); + assert_that(msg).is_rows().with_size(1); + assert_that(msg).is_rows().with_rows({{{long_type->decompose(5000L)}}}); + + msg = e.execute_cql("select count(*) from vcf2 where v = 0").get0(); + assert_that(msg).is_rows().with_size(1); + assert_that(msg).is_rows().with_rows({{{long_type->decompose(5000L)}}}); + }); +} + +std::mt19937 random_generator() { + std::random_device rd; + // In case of errors, replace the seed with a fixed value to get a deterministic run. + auto seed = rd(); + std::cout << "Random seed: " << seed << "\n"; + return std::mt19937(seed); +} + +bytes random_bytes(size_t size, std::mt19937& gen) { + bytes result(bytes::initialized_later(), size); + static thread_local std::uniform_int_distribution dist(0, std::numeric_limits::max()); + for (size_t i = 0; i < size; ++i) { + result[i] = dist(gen); + } + return result; +} + +SEASTAR_TEST_CASE(test_builder_across_tokens_with_large_partitions) { + return do_with_cql_env_thread([] (cql_test_env& e) { + auto gen = random_generator(); + + e.execute_cql("create table cf (p blob, c int, v int, primary key (p, c))").get(); + auto s = e.local_db().find_schema("ks", "cf"); + + auto make_key = [&] (auto) { return to_hex(random_bytes(128, gen)); }; + for (auto&& k : boost::irange(0, 4) | boost::adaptors::transformed(make_key)) { + for (auto i = 0; i < 1000; ++i) { + e.execute_cql(sprint("insert into cf (p, c, v) values (0x%s, %d, 0)", k, i)).get(); + } + } + + auto f1 = e.local_view_builder().wait_until_built("ks", "vcf1", lowres_clock::now() + 60s); + auto f2 = e.local_view_builder().wait_until_built("ks", "vcf2", lowres_clock::now() + 30s); + + e.execute_cql("create materialized view vcf1 as select * from cf " + "where p is not null and c is not null and v is not null " + "primary key (v, c, p)").get(); + + sleep(1s).get(); + + e.execute_cql("create materialized view vcf2 as select * from cf " + "where p is not null and c is not null and v is not null " + "primary key (v, p, c)").get(); + + f2.get(); + f1.get(); + auto built = db::system_keyspace::load_built_views().get0(); + BOOST_REQUIRE_EQUAL(built.size(), 2); + + auto msg = e.execute_cql("select count(*) from vcf1 where v = 0").get0(); + assert_that(msg).is_rows().with_size(1); + assert_that(msg).is_rows().with_rows({{{long_type->decompose(4000L)}}}); + + msg = e.execute_cql("select count(*) from vcf2 where v = 0").get0(); + assert_that(msg).is_rows().with_size(1); + assert_that(msg).is_rows().with_rows({{{long_type->decompose(4000L)}}}); + }); +} + +SEASTAR_TEST_CASE(test_builder_across_tokens_with_small_partitions) { + return do_with_cql_env_thread([] (cql_test_env& e) { + auto gen = random_generator(); + + e.execute_cql("create table cf (p blob, c int, v int, primary key (p, c))").get(); + auto s = e.local_db().find_schema("ks", "cf"); + + auto make_key = [&] (auto) { return to_hex(random_bytes(128, gen)); }; + for (auto&& k : boost::irange(0, 1000) | boost::adaptors::transformed(make_key)) { + for (auto i = 0; i < 4; ++i) { + e.execute_cql(sprint("insert into cf (p, c, v) values (0x%s, %d, 0)", k, i)).get(); + } + } + + auto f1 = e.local_view_builder().wait_until_built("ks", "vcf1", lowres_clock::now() + 60s); + auto f2 = e.local_view_builder().wait_until_built("ks", "vcf2", lowres_clock::now() + 30s); + + e.execute_cql("create materialized view vcf1 as select * from cf " + "where p is not null and c is not null and v is not null " + "primary key (v, c, p)").get(); + + sleep(1s).get(); + + e.execute_cql("create materialized view vcf2 as select * from cf " + "where p is not null and c is not null and v is not null " + "primary key (v, p, c)").get(); + + f2.get(); + f1.get(); + + auto built = db::system_keyspace::load_built_views().get0(); + BOOST_REQUIRE_EQUAL(built.size(), 2); + + auto msg = e.execute_cql("select count(*) from vcf1 where v = 0").get0(); + assert_that(msg).is_rows().with_size(1); + assert_that(msg).is_rows().with_rows({{{long_type->decompose(4000L)}}}); + + msg = e.execute_cql("select count(*) from vcf2 where v = 0").get0(); + assert_that(msg).is_rows().with_size(1); + assert_that(msg).is_rows().with_rows({{{long_type->decompose(4000L)}}}); + }); +} + +SEASTAR_TEST_CASE(test_builder_with_tombstones) { + return do_with_cql_env_thread([] (cql_test_env& e) { + e.execute_cql("create table cf (p int, c1 int, c2 int, v int, primary key (p, c1, c2))").get(); + + for (auto i = 0; i < 100; ++i) { + e.execute_cql(sprint("insert into cf (p, c1, c2, v) values (0, %d, %d, 1)", i % 2, i)).get(); + } + + e.execute_cql("delete from cf where p = 0 and c1 = 0").get(); + e.execute_cql("delete from cf where p = 0 and c1 = 1 and c2 >= 50 and c2 < 101").get(); + + auto f = e.local_view_builder().wait_until_built("ks", "vcf", lowres_clock::now() + 30s); + e.execute_cql("create materialized view vcf as select * from cf " + "where p is not null and c1 is not null and c2 is not null and v is not null " + "primary key ((v, p), c1, c2)").get(); + + f.get(); + auto built = db::system_keyspace::load_built_views().get0(); + BOOST_REQUIRE_EQUAL(built.size(), 1); + + auto msg = e.execute_cql("select * from vcf").get0(); + assert_that(msg).is_rows().with_size(25); + }); +} + +SEASTAR_TEST_CASE(test_builder_with_concurrent_writes) { + return do_with_cql_env_thread([] (cql_test_env& e) { + auto gen = random_generator(); + + e.execute_cql("create table cf (p blob, c int, v int, primary key (p, c))").get(); + + const size_t rows = 6864; + const size_t rows_per_partition = 4; + const size_t partitions = rows / rows_per_partition; + + std::unordered_set keys; + while (keys.size() != partitions) { + keys.insert(to_hex(random_bytes(128, gen))); + } + + auto half = keys.begin(); + std::advance(half, keys.size() / 2); + auto k = keys.begin(); + for (; k != half; ++k) { + for (size_t i = 0; i < rows_per_partition; ++i) { + e.execute_cql(sprint("insert into cf (p, c, v) values (0x%s, %d, 0)", *k, i)).get(); + } + } + + auto f = e.local_view_builder().wait_until_built("ks", "vcf", lowres_clock::now() + 60s); + e.execute_cql("create materialized view vcf as select * from cf " + "where p is not null and c is not null and v is not null " + "primary key (v, c, p)").get(); + + for (; k != keys.end(); ++k) { + for (size_t i = 0; i < rows_per_partition; ++i) { + e.execute_cql(sprint("insert into cf (p, c, v) values (0x%s, %d, 0)", *k, i)).get(); + } + } + + f.get(); + eventually([&] { + auto msg = e.execute_cql("select * from vcf").get0(); + assert_that(msg).is_rows().with_size(rows); + }); + }); +} + +SEASTAR_TEST_CASE(test_builder_with_concurrent_drop) { + return do_with_cql_env_thread([] (cql_test_env& e) { + auto gen = random_generator(); + + e.execute_cql("create table cf (p blob, c int, v int, primary key (p, c))").get(); + + auto make_key = [&] (auto) { return to_hex(random_bytes(128, gen)); }; + for (auto&& k : boost::irange(0, 1000) | boost::adaptors::transformed(make_key)) { + for (auto i = 0; i < 5; ++i) { + e.execute_cql(sprint("insert into cf (p, c, v) values (0x%s, %d, 0)", k, i)).get(); + } + } + + e.execute_cql("create materialized view vcf as select * from cf " + "where p is not null and c is not null and v is not null " + "primary key (v, c, p)").get(); + + e.execute_cql("drop materialized view vcf").get(); + + eventually([&] { + auto msg = e.execute_cql("select * from system.scylla_views_builds_in_progress").get0(); + assert_that(msg).is_rows().is_empty(); + msg = e.execute_cql("select * from system.built_views").get0(); + assert_that(msg).is_rows().is_empty(); + msg = e.execute_cql("select * from system.views_builds_in_progress").get0(); + assert_that(msg).is_rows().is_empty(); + msg = e.execute_cql("select * from system_distributed.view_build_status").get0(); + assert_that(msg).is_rows().is_empty(); + }); + }); +}