diff --git a/api/api-doc/storage_service.json b/api/api-doc/storage_service.json index f4ffb0f4d4..5756c7e898 100644 --- a/api/api-doc/storage_service.json +++ b/api/api-doc/storage_service.json @@ -2129,6 +2129,41 @@ ] } ] + }, + { + "path":"/storage_service/view_build_statuses/{keyspace}/{view}", + "operations":[ + { + "method":"GET", + "summary":"Gets the progress of a materialized view build", + "type":"array", + "items":{ + "type":"mapper" + }, + "nickname":"view_build_statuses", + "produces":[ + "application/json" + ], + "parameters":[ + { + "name":"keyspace", + "description":"The keyspace", + "required":true, + "allowMultiple":false, + "type":"string", + "paramType":"path" + }, + { + "name":"view", + "description":"View name", + "required":true, + "allowMultiple":false, + "type":"string", + "paramType":"path" + } + ] + } + ] } ], "models":{ diff --git a/api/storage_service.cc b/api/storage_service.cc index 4a9c6bee3b..385bd9303f 100644 --- a/api/storage_service.cc +++ b/api/storage_service.cc @@ -852,6 +852,15 @@ void set_storage_service(http_context& ctx, routes& r) { return make_ready_future(map_to_key_value(ownership, res)); }); }); + + ss::view_build_statuses.set(r, [&ctx] (std::unique_ptr req) { + auto keyspace = validate_keyspace(ctx, req->param); + auto view = req->param["view"]; + return service::get_local_storage_service().view_build_statuses(std::move(keyspace), std::move(view)).then([] (std::unordered_map status) { + std::vector res; + return make_ready_future(map_to_key_value(std::move(status), res)); + }); + }); } } diff --git a/configure.py b/configure.py index 21f4fffea5..593783a93a 100755 --- a/configure.py +++ b/configure.py @@ -273,6 +273,7 @@ scylla_tests = [ 'tests/input_stream_test', 'tests/virtual_reader_test', 'tests/view_schema_test', + 'tests/view_build_test', 'tests/counter_test', 'tests/cell_locker_test', 'tests/row_locker_test', @@ -491,6 +492,7 @@ scylla_core = (['database.cc', 'cql3/variable_specifications.cc', 'db/consistency_level.cc', 'db/system_keyspace.cc', + 'db/system_distributed_keyspace.cc', 'db/schema_tables.cc', 'db/cql_type_parser.cc', 'db/legacy_schema_migrator.cc', diff --git a/database.cc b/database.cc index fc1afa3326..1a983488d3 100644 --- a/database.cc +++ b/database.cc @@ -2914,6 +2914,11 @@ bool database::has_schema(const sstring& ks_name, const sstring& cf_name) const return _ks_cf_to_uuid.count(std::make_pair(ks_name, cf_name)) > 0; } +std::vector database::get_views() const { + return boost::copy_range>(get_non_system_column_families() + | boost::adaptors::filtered([] (auto& cf) { return cf->schema()->is_view(); }) + | boost::adaptors::transformed([] (auto& cf) { return view_ptr(cf->schema()); })); +} void database::create_in_memory_keyspace(const lw_shared_ptr& ksm) { keyspace ks(ksm, std::move(make_keyspace_config(*ksm))); @@ -3259,7 +3264,7 @@ future database::do_apply_counter_update(column_family& cf, const froz std::move(regular_columns), { }, { }, cql_serialization_format::internal(), query::max_rows); return do_with(std::move(slice), std::move(m), std::vector(), - [this, &cf, timeout, trace_state = std::move(trace_state)] (const query::partition_slice& slice, mutation& m, std::vector& locks) mutable { + [this, &cf, timeout, trace_state = std::move(trace_state), op = cf.write_in_progress()] (const query::partition_slice& slice, mutation& m, std::vector& locks) mutable { tracing::trace(trace_state, "Acquiring counter locks"); return cf.lock_counter_cells(m, timeout).then([&, m_schema = cf.schema(), trace_state = std::move(trace_state), timeout, this] (std::vector lcs) mutable { locks = std::move(lcs); @@ -3491,16 +3496,19 @@ future<> database::do_apply(schema_ptr s, const frozen_mutation& m, db::timeout_ throw std::runtime_error(sprint("attempted to mutate using not synced schema of %s.%s, version=%s", s->ks_name(), s->cf_name(), s->version())); } + + // Signal to view building code that a write is in progress, + // so it knows when new writes start being sent to a new view. + auto op = cf.write_in_progress(); if (cf.views().empty()) { - return apply_with_commitlog(std::move(s), cf, std::move(uuid), m, timeout); + return apply_with_commitlog(std::move(s), cf, std::move(uuid), m, timeout).finally([op = std::move(op)] { }); } future f = cf.push_view_replica_updates(s, m); - return f.then([this, s = std::move(s), uuid = std::move(uuid), &m, timeout] (row_locker::lock_holder lock) { - auto& cf = find_column_family(uuid); + return f.then([this, s = std::move(s), uuid = std::move(uuid), &m, timeout, &cf, op = std::move(op)] (row_locker::lock_holder lock) mutable { return apply_with_commitlog(std::move(s), cf, std::move(uuid), m, timeout).finally( // Hold the local lock on the base-table partition or row // taken before the read, until the update is done. - [lock = std::move(lock)] { }); + [lock = std::move(lock), op = std::move(op)] { }); }); }); } @@ -4265,9 +4273,10 @@ void column_family::set_schema(schema_ptr s) { static std::vector::iterator find_view(std::vector& views, const view_ptr& v) { return std::find_if(views.begin(), views.end(), [&v] (auto&& e) { - return e->cf_name() == v->cf_name(); + return e->id() == v->id(); }); } + void column_family::add_or_update_view(view_ptr v) { auto existing = find_view(_views, v); if (existing != _views.end()) { @@ -4317,7 +4326,7 @@ future<> column_family::generate_and_propagate_view_updates(const schema_ptr& ba std::move(views), flat_mutation_reader_from_mutations({std::move(m)}), std::move(existings)).then([base_token = std::move(base_token)] (auto&& updates) { - db::view::mutate_MV(std::move(base_token), std::move(updates)); + db::view::mutate_MV(std::move(base_token), std::move(updates)).handle_exception([] (auto ignored) { }); }); } @@ -4456,6 +4465,31 @@ column_family::local_base_lock(const schema_ptr& s, const dht::decorated_key& pk } } +/** + * Given some updates on the base table and assuming there are no pre-existing, overlapping updates, + * generates the mutations to be applied to the base table's views, and sends them to the paired + * view replicas. The future resolves when the updates have been acknowledged by the repicas, i.e., + * propagating the view updates to the view replicas happens synchronously. + * + * @param views the affected views which need to be updated. + * @param base_token The token to use to match the base replica with the paired replicas. + * @param reader the base table updates being applied, which all correspond to the base token. + * @return a future that resolves when the updates have been acknowledged by the view replicas + */ +future<> column_family::populate_views( + std::vector views, + dht::token base_token, + flat_mutation_reader&& reader) { + auto& schema = reader.schema(); + return db::view::generate_view_updates( + schema, + std::move(views), + std::move(reader), + { }).then([base_token = std::move(base_token)] (auto&& updates) { + return db::view::mutate_MV(std::move(base_token), std::move(updates)); + }); +} + void column_family::set_hit_rate(gms::inet_address addr, cache_temperature rate) { auto& e = _cluster_cache_hit_rates[addr]; e.rate = rate; diff --git a/database.hh b/database.hh index ee49939b7b..7cf2b088ff 100644 --- a/database.hh +++ b/database.hh @@ -461,6 +461,11 @@ private: double _cached_percentile = -1; lowres_clock::time_point _percentile_cache_timestamp; std::chrono::milliseconds _percentile_cache_value; + + // Phaser used to synchronize with in-progress writes. This is useful for code that, + // after some modification, needs to ensure that news writes will see it before + // it can proceed, such as the view building code. + utils::phased_barrier _pending_writes_phaser; private: void update_stats_for_new_sstable(uint64_t disk_space_used_by_sstable, const std::vector& shards_for_the_sstable) noexcept; // Adds new sstable to the set of sstables @@ -784,6 +789,14 @@ public: future<> run_with_compaction_disabled(std::function ()> func); + utils::phased_barrier::operation write_in_progress() { + return _pending_writes_phaser.start(); + } + + future<> await_pending_writes() { + return _pending_writes_phaser.advance_and_await(); + } + void add_or_update_view(view_ptr v); void remove_view(view_ptr v); const std::vector& views() const; @@ -798,6 +811,12 @@ public: uint64_t large_partition_warning_threshold_bytes() const { return _config.large_partition_warning_threshold_bytes; } + + future<> populate_views( + std::vector, + dht::token base_token, + flat_mutation_reader&&); + private: std::vector affected_views(const schema_ptr& base, const mutation& update) const; future<> generate_and_propagate_view_updates(const schema_ptr& base, @@ -1273,6 +1292,8 @@ public: std::vector> get_non_system_column_families() const; + std::vector get_views() const; + const std::unordered_map, utils::UUID, utils::tuple_hash>& get_column_families_mapping() const { return _ks_cf_to_uuid; diff --git a/db/system_distributed_keyspace.cc b/db/system_distributed_keyspace.cc new file mode 100644 index 0000000000..a09ff336ab --- /dev/null +++ b/db/system_distributed_keyspace.cc @@ -0,0 +1,143 @@ +/* + * Copyright (C) 2018 ScyllaDB + */ + +/* + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Scylla. If not, see . + */ + +#include "db/system_distributed_keyspace.hh" + +#include "cql3/untyped_result_set.hh" +#include "database.hh" +#include "db/consistency_level_type.hh" +#include "db/system_keyspace.hh" +#include "schema_builder.hh" +#include "types.hh" + +#include +#include + +#include + +#include +#include +#include + +namespace db { + +schema_ptr view_build_status() { + static thread_local auto schema = [] { + auto id = generate_legacy_id(system_distributed_keyspace::NAME, system_distributed_keyspace::VIEW_BUILD_STATUS); + return schema_builder(system_distributed_keyspace::NAME, system_distributed_keyspace::VIEW_BUILD_STATUS, std::experimental::make_optional(id)) + .with_column("keyspace_name", utf8_type, column_kind::partition_key) + .with_column("view_name", utf8_type, column_kind::partition_key) + .with_column("host_id", uuid_type, column_kind::clustering_key) + .with_column("status", utf8_type) + .with_version(system_keyspace::generate_schema_version(id)) + .build(); + }(); + return schema; +} + +static std::vector all_tables() { + return { + view_build_status(), + }; +} + +system_distributed_keyspace::system_distributed_keyspace(cql3::query_processor& qp, service::migration_manager& mm) + : _qp(qp) + , _mm(mm) { +} + +future<> system_distributed_keyspace::start() { + if (engine().cpu_id() != 0) { + return make_ready_future<>(); + } + + static auto ignore_existing = [] (seastar::noncopyable_function()> func) { + return futurize_apply(std::move(func)).handle_exception_type([] (exceptions::already_exists_exception& ignored) { }); + }; + + // We use min_timestamp so that the default keyspace metadata will lose with any manual adjustments. + // See issue #2129. + return ignore_existing([this] { + auto ksm = keyspace_metadata::new_keyspace( + NAME, + "org.apache.cassandra.locator.SimpleStrategy", + {{"replication_factor", "3"}}, + true); + return _mm.announce_new_keyspace(ksm, api::min_timestamp, false); + }).then([this] { + return do_with(all_tables(), [this] (std::vector& tables) { + return do_for_each(tables, [this] (schema_ptr table) { + return ignore_existing([this, table = std::move(table)] { + return _mm.announce_new_column_family(std::move(table), false); + }); + }); + }); + }); +} + +future<> system_distributed_keyspace::stop() { + return make_ready_future<>(); +} + +future> system_distributed_keyspace::view_status(sstring ks_name, sstring view_name) const { + return _qp.process( + sprint("SELECT host_id, status FROM %s.%s WHERE keyspace_name = ? AND view_name = ?", NAME, VIEW_BUILD_STATUS), + db::consistency_level::ONE, + { std::move(ks_name), std::move(view_name) }, + false).then([this] (::shared_ptr cql_result) { + return boost::copy_range>(*cql_result + | boost::adaptors::transformed([] (const cql3::untyped_result_set::row& row) { + auto host_id = row.get_as("host_id"); + auto status = row.get_as("status"); + return std::pair(std::move(host_id), std::move(status)); + })); + }); +} + +future<> system_distributed_keyspace::start_view_build(sstring ks_name, sstring view_name) const { + return db::system_keyspace::get_local_host_id().then([this, ks_name = std::move(ks_name), view_name = std::move(view_name)] (utils::UUID host_id) { + return _qp.process( + sprint("INSERT INTO %s.%s (keyspace_name, view_name, host_id, status) VALUES (?, ?, ?, ?)", NAME, VIEW_BUILD_STATUS), + db::consistency_level::ONE, + { std::move(ks_name), std::move(view_name), std::move(host_id), "STARTED" }, + false).discard_result(); + }); +} + +future<> system_distributed_keyspace::finish_view_build(sstring ks_name, sstring view_name) const { + return db::system_keyspace::get_local_host_id().then([this, ks_name = std::move(ks_name), view_name = std::move(view_name)] (utils::UUID host_id) { + return _qp.process( + sprint("UPDATE %s.%s SET status = ? WHERE keyspace_name = ? AND view_name = ? AND host_id = ?", NAME, VIEW_BUILD_STATUS), + db::consistency_level::ONE, + { "SUCCESS", std::move(ks_name), std::move(view_name), std::move(host_id) }, + false).discard_result(); + }); +} + +future<> system_distributed_keyspace::remove_view(sstring ks_name, sstring view_name) const { + return _qp.process( + sprint("DELETE FROM %s.%s WHERE keyspace_name = ? AND view_name = ?", NAME, VIEW_BUILD_STATUS), + db::consistency_level::ONE, + { std::move(ks_name), std::move(view_name) }, + false).discard_result(); +} + +} \ No newline at end of file diff --git a/db/system_distributed_keyspace.hh b/db/system_distributed_keyspace.hh new file mode 100644 index 0000000000..39369c1e09 --- /dev/null +++ b/db/system_distributed_keyspace.hh @@ -0,0 +1,58 @@ +/* + * Copyright (C) 2018 ScyllaDB + */ + +/* + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Scylla. If not, see . + */ + +#pragma once + +#include "bytes.hh" +#include "cql3/query_processor.hh" +#include "schema.hh" +#include "service/migration_manager.hh" +#include "utils/UUID.hh" + +#include +#include + +#include + +namespace db { + +class system_distributed_keyspace { +public: + static constexpr auto NAME = "system_distributed"; + static constexpr auto VIEW_BUILD_STATUS = "view_build_status"; + +private: + cql3::query_processor& _qp; + service::migration_manager& _mm; + +public: + system_distributed_keyspace(cql3::query_processor&, service::migration_manager&); + + future<> start(); + future<> stop(); + + future> view_status(sstring ks_name, sstring view_name) const; + future<> start_view_build(sstring ks_name, sstring view_name) const; + future<> finish_view_build(sstring ks_name, sstring view_name) const; + future<> remove_view(sstring ks_name, sstring view_name) const; +}; + +} \ No newline at end of file diff --git a/db/system_keyspace.cc b/db/system_keyspace.cc index fdc6086109..826fb7f569 100644 --- a/db/system_keyspace.cc +++ b/db/system_keyspace.cc @@ -74,6 +74,7 @@ #include "db/size_estimates_virtual_reader.hh" #include "db/timeout_clock.hh" #include "sstables/sstables.hh" +#include "db/view/build_progress_virtual_reader.hh" #include "db/schema_tables.hh" using days = std::chrono::duration>; @@ -642,6 +643,22 @@ schema_ptr built_views() { return schema; } +schema_ptr scylla_views_builds_in_progress() { + static thread_local auto schema = [] { + auto id = generate_legacy_id(NAME, SCYLLA_VIEWS_BUILDS_IN_PROGRESS); + return schema_builder(NAME, SCYLLA_VIEWS_BUILDS_IN_PROGRESS, stdx::make_optional(id)) + .with_column("keyspace_name", utf8_type, column_kind::partition_key) + .with_column("view_name", utf8_type, column_kind::clustering_key) + .with_column("cpu_id", int32_type, column_kind::clustering_key) + .with_column("next_token", utf8_type) + .with_column("generation_number", int32_type) + .with_column("first_token", utf8_type) + .with_version(generate_schema_version(id)) + .build(); + }(); + return schema; +} + } // namespace legacy { @@ -1541,7 +1558,8 @@ std::vector all_tables() { r.insert(r.end(), { built_indexes(), hints(), batchlog(), paxos(), local(), peers(), peer_events(), range_xfers(), compactions_in_progress(), compaction_history(), - sstable_activity(), size_estimates(), + sstable_activity(), size_estimates(), v3::views_builds_in_progress(), v3::built_views(), + v3::scylla_views_builds_in_progress(), }); // legacy schema r.insert(r.end(), { @@ -1558,6 +1576,9 @@ static void maybe_add_virtual_reader(schema_ptr s, database& db) { if (s.get() == size_estimates().get()) { db.find_column_family(s).set_virtual_reader(mutation_source(db::size_estimates::virtual_reader())); } + if (s.get() == v3::views_builds_in_progress().get()) { + db.find_column_family(s).set_virtual_reader(mutation_source(db::view::build_progress_virtual_reader(db))); + } } static bool maybe_write_in_user_memory(schema_ptr s, database& db) { @@ -1783,6 +1804,85 @@ mutation make_size_estimates_mutation(const sstring& ks, std::vector register_view_for_building(sstring ks_name, sstring view_name, const dht::token& token) { + sstring req = sprint("INSERT INTO system.%s (keyspace_name, view_name, generation_number, cpu_id, first_token) VALUES (?, ?, ?, ?, ?)", + v3::SCYLLA_VIEWS_BUILDS_IN_PROGRESS); + return execute_cql( + std::move(req), + std::move(ks_name), + std::move(view_name), + 0, + int32_t(engine().cpu_id()), + dht::global_partitioner().to_sstring(token)).discard_result(); +} + +future<> update_view_build_progress(sstring ks_name, sstring view_name, const dht::token& token) { + sstring req = sprint("INSERT INTO system.%s (keyspace_name, view_name, next_token, cpu_id) VALUES (?, ?, ?, ?)", + v3::SCYLLA_VIEWS_BUILDS_IN_PROGRESS); + return execute_cql( + std::move(req), + std::move(ks_name), + std::move(view_name), + dht::global_partitioner().to_sstring(token), + int32_t(engine().cpu_id())).discard_result(); +} + +future<> remove_view_build_progress_across_all_shards(sstring ks_name, sstring view_name) { + return execute_cql( + sprint("DELETE FROM system.%s WHERE keyspace_name = ? AND view_name = ?", v3::SCYLLA_VIEWS_BUILDS_IN_PROGRESS), + std::move(ks_name), + std::move(view_name)).discard_result(); +} + +future<> mark_view_as_built(sstring ks_name, sstring view_name) { + return execute_cql( + sprint("INSERT INTO system.%s (keyspace_name, view_name) VALUES (?, ?)", v3::BUILT_VIEWS), + std::move(ks_name), + std::move(view_name)).discard_result(); +} + +future<> remove_built_view(sstring ks_name, sstring view_name) { + return execute_cql( + sprint("DELETE FROM system.%s WHERE keyspace_name = ? AND view_name = ?", v3::BUILT_VIEWS), + std::move(ks_name), + std::move(view_name)).discard_result(); +} + +future> load_built_views() { + return execute_cql(sprint("SELECT * FROM system.%s", v3::BUILT_VIEWS)).then([] (::shared_ptr cql_result) { + return boost::copy_range>(*cql_result + | boost::adaptors::transformed([] (const cql3::untyped_result_set::row& row) { + auto ks_name = row.get_as("keyspace_name"); + auto cf_name = row.get_as("view_name"); + return std::pair(std::move(ks_name), std::move(cf_name)); + })); + }); +} + +future> load_view_build_progress() { + return execute_cql(sprint("SELECT keyspace_name, view_name, first_token, next_token, cpu_id FROM system.%s", + v3::SCYLLA_VIEWS_BUILDS_IN_PROGRESS)).then([] (::shared_ptr cql_result) { + std::vector progress; + for (auto& row : *cql_result) { + auto ks_name = row.get_as("keyspace_name"); + auto cf_name = row.get_as("view_name"); + auto first_token = dht::global_partitioner().from_sstring(row.get_as("first_token")); + auto next_token_sstring = row.get_opt("next_token"); + std::optional next_token; + if (next_token_sstring) { + next_token = dht::global_partitioner().from_sstring(std::move(next_token_sstring).value()); + } + auto cpu_id = row.get_as("cpu_id"); + progress.emplace_back(view_build_progress{ + view_name(std::move(ks_name), std::move(cf_name)), + std::move(first_token), + std::move(next_token), + static_cast(cpu_id)}); + } + return progress; + }); +} + } // namespace system_keyspace sstring system_keyspace_name() { diff --git a/db/system_keyspace.hh b/db/system_keyspace.hh index 1ad8dac1a3..441e832ca3 100644 --- a/db/system_keyspace.hh +++ b/db/system_keyspace.hh @@ -40,8 +40,10 @@ #pragma once +#include #include #include +#include #include "schema.hh" #include "utils/UUID.hh" #include "gms/inet_address.hh" @@ -99,6 +101,7 @@ static constexpr auto SIZE_ESTIMATES = "size_estimates"; static constexpr auto AVAILABLE_RANGES = "available_ranges"; static constexpr auto VIEWS_BUILDS_IN_PROGRESS = "views_builds_in_progress"; static constexpr auto BUILT_VIEWS = "built_views"; +static constexpr auto SCYLLA_VIEWS_BUILDS_IN_PROGRESS = "scylla_views_builds_in_progress"; } namespace legacy { @@ -122,6 +125,14 @@ struct range_estimates { int64_t mean_partition_size; }; +using view_name = std::pair; +struct view_build_progress { + view_name view; + dht::token first_token; + std::optional next_token; + shard_id cpu_id; +}; + extern schema_ptr hints(); extern schema_ptr batchlog(); extern schema_ptr paxos(); @@ -652,5 +663,13 @@ future<> set_bootstrap_state(bootstrap_state state); */ mutation make_size_estimates_mutation(const sstring& ks, std::vector estimates); +future<> register_view_for_building(sstring ks_name, sstring view_name, const dht::token& token); +future<> update_view_build_progress(sstring ks_name, sstring view_name, const dht::token& token); +future<> remove_view_build_progress_across_all_shards(sstring ks_name, sstring view_name); +future<> mark_view_as_built(sstring ks_name, sstring view_name); +future<> remove_built_view(sstring ks_name, sstring view_name); +future> load_built_views(); +future> load_view_build_progress(); + } // namespace system_keyspace } // namespace db diff --git a/db/view/build_progress_virtual_reader.hh b/db/view/build_progress_virtual_reader.hh new file mode 100644 index 0000000000..28e267ad9c --- /dev/null +++ b/db/view/build_progress_virtual_reader.hh @@ -0,0 +1,195 @@ +/* + * Copyright (C) 2018 ScyllaDB + */ + +/* + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Scylla. If not, see . + */ + +#include "database.hh" +#include "db/system_keyspace.hh" +#include "db/timeout_clock.hh" +#include "dht/i_partitioner.hh" +#include "flat_mutation_reader.hh" +#include "mutation_fragment.hh" +#include "mutation_reader.hh" +#include "query-request.hh" +#include "schema.hh" +#include "tracing/tracing.hh" + +#include + +#include +#include + +namespace db::view { + +// Allows a user to query the views_builds_in_progress system table +// in terms of the scylla_views_builds_in_progress one, which is +// a superset of the former. When querying, we don't have to adjust +// the clustering key, but we have to adjust the requested regular +// columns. When reading the results from the scylla_views_builds_in_progress +// table, we adjust the clustering key (we shed the cpu_id column) and map +// back the regular columns. +class build_progress_virtual_reader { + database& _db; + + struct build_progress_reader : flat_mutation_reader::impl { + column_id _scylla_next_token_col; + column_id _scylla_generation_number_col; + column_id _legacy_last_token_col; + column_id _legacy_generation_number_col; + const query::partition_slice& _legacy_slice; + query::partition_slice _slice; + flat_mutation_reader _underlying; + + build_progress_reader( + schema_ptr legacy_schema, + column_family& scylla_views_build_progress, + const dht::partition_range& range, + const query::partition_slice& slice, + const io_priority_class& pc, + tracing::trace_state_ptr trace_state, + streamed_mutation::forwarding fwd, + mutation_reader::forwarding fwd_mr) + : flat_mutation_reader::impl(std::move(legacy_schema)) + , _scylla_next_token_col(scylla_views_build_progress.schema()->get_column_definition("next_token")->id) + , _scylla_generation_number_col(scylla_views_build_progress.schema()->get_column_definition("generation_number")->id) + , _legacy_last_token_col(_schema->get_column_definition("last_token")->id) + , _legacy_generation_number_col(_schema->get_column_definition("generation_number")->id) + , _legacy_slice(slice) + , _slice(adjust_partition_slice()) + , _underlying(scylla_views_build_progress.make_reader( + scylla_views_build_progress.schema(), + range, + slice, + pc, + std::move(trace_state), + fwd, + fwd_mr)) { + } + + const schema& underlying_schema() const { + return *_underlying.schema(); + } + + query::partition_slice adjust_partition_slice() { + auto slice = _legacy_slice; + std::vector adjusted_columns; + for (auto col_id : slice.regular_columns) { + if (col_id == _legacy_last_token_col) { + adjusted_columns.push_back(_scylla_next_token_col); + } else if (col_id == _legacy_generation_number_col) { + adjusted_columns.push_back(_scylla_generation_number_col); + } + } + slice.regular_columns = std::move(adjusted_columns); + return slice; + } + + clustering_key adjust_ckey(clustering_key& ck) { + if (ck.size(underlying_schema()) < 3) { + return std::move(ck); + } + // Drop the cpu_id from the clustering key + auto end = ck.begin(*_schema); + std::advance(end, 1); + auto r = boost::make_iterator_range(ck.begin(*_schema), std::move(end)); + return clustering_key_prefix::from_exploded(r); + } + + virtual future<> fill_buffer(db::timeout_clock::time_point timeout) override { + return _underlying.fill_buffer(timeout).then([this] { + _end_of_stream = _underlying.is_end_of_stream(); + while (!_underlying.is_buffer_empty()) { + auto mf = _underlying.pop_mutation_fragment(); + if (mf.is_clustering_row()) { + auto scylla_in_progress_row = std::move(mf).as_clustering_row(); + auto legacy_in_progress_row = row(); + // Drop the first_token from the regular columns + scylla_in_progress_row.cells().for_each_cell([&, this] (column_id id, atomic_cell_or_collection& c) { + if (id == _scylla_next_token_col) { + legacy_in_progress_row.append_cell(_legacy_last_token_col, std::move(c)); + } else if (id == _scylla_generation_number_col) { + legacy_in_progress_row.append_cell(_legacy_generation_number_col, std::move(c)); + } + }); + mf = clustering_row( + adjust_ckey(scylla_in_progress_row.key()), + std::move(scylla_in_progress_row.tomb()), + std::move(scylla_in_progress_row.marker()), + std::move(legacy_in_progress_row)); + } else if (mf.is_range_tombstone()) { + auto scylla_in_progress_rt = std::move(mf).as_range_tombstone(); + mf = range_tombstone( + adjust_ckey(scylla_in_progress_rt.start), + scylla_in_progress_rt.start_kind, + scylla_in_progress_rt.end, + scylla_in_progress_rt.end_kind, + scylla_in_progress_rt.tomb); + } + push_mutation_fragment(mf); + } + }); + } + + virtual void next_partition() override { + _end_of_stream = false; + clear_buffer_to_next_partition(); + if (is_buffer_empty()) { + _underlying.next_partition(); + } + } + + virtual future<> fast_forward_to(const dht::partition_range& pr, db::timeout_clock::time_point timeout) override { + clear_buffer(); + _end_of_stream = false; + return _underlying.fast_forward_to(pr, timeout); + } + + virtual future<> fast_forward_to(position_range range, db::timeout_clock::time_point timeout) override { + forward_buffer_to(range.start()); + _end_of_stream = false; + return _underlying.fast_forward_to(std::move(range), timeout); + } + }; + +public: + build_progress_virtual_reader(database& db) + : _db(db) { + } + + flat_mutation_reader operator()( + schema_ptr s, + const dht::partition_range& range, + const query::partition_slice& slice, + const io_priority_class& pc, + tracing::trace_state_ptr trace_state, + streamed_mutation::forwarding fwd, + mutation_reader::forwarding fwd_mr) { + return flat_mutation_reader(std::make_unique( + std::move(s), + _db.find_column_family(s->ks_name(), system_keyspace::v3::SCYLLA_VIEWS_BUILDS_IN_PROGRESS), + range, + slice, + pc, + std::move(trace_state), + fwd, + fwd_mr)); + } +}; + +} \ No newline at end of file diff --git a/db/view/view.cc b/db/view/view.cc index de21a483c7..5b3e135c08 100644 --- a/db/view/view.cc +++ b/db/view/view.cc @@ -39,22 +39,36 @@ * along with Scylla. If not, see . */ -#include +#include #include +#include +#include +#include +#include +#include #include #include +#include + +#include "database.hh" #include "clustering_bounds_comparator.hh" #include "cql3/statements/select_statement.hh" #include "cql3/util.hh" #include "db/view/view.hh" +#include "db/view/view_builder.hh" #include "gms/inet_address.hh" #include "keys.hh" #include "locator/network_topology_strategy.hh" +#include "mutation.hh" +#include "mutation_partition.hh" +#include "service/migration_manager.hh" #include "service/storage_service.hh" #include "view_info.hh" +using namespace std::chrono_literals; + static logging::logger vlogger("view"); view_info::view_info(const schema& schema, const raw_view_info& raw_view_info) @@ -791,7 +805,7 @@ get_view_natural_endpoint(const sstring& keyspace_name, // for the writes to complete. // FIXME: I dropped a lot of parameters the Cassandra version had, // we may need them back: writeCommitLog, baseComplete, queryStartNanoTime. -void mutate_MV(const dht::token& base_token, +future<> mutate_MV(const dht::token& base_token, std::vector mutations) { #if 0 @@ -823,6 +837,7 @@ void mutate_MV(const dht::token& base_token, () -> asyncRemoveFromBatchlog(batchlogEndpoints, batchUUID)); // add a handler for each mutation - includes checking availability, but doesn't initiate any writes, yet #endif + auto fs = std::make_unique>>(); for (auto& mut : mutations) { auto view_token = mut.token(); auto keyspace_name = mut.schema()->ks_name(); @@ -840,9 +855,10 @@ void mutate_MV(const dht::token& base_token, // do not wait for it to complete. // Note also that mutate_locally(mut) copies mut (in // frozen form) so don't need to increase its lifetime. - service::get_local_storage_proxy().mutate_locally(mut).handle_exception([] (auto ep) { + fs->push_back(service::get_local_storage_proxy().mutate_locally(mut).handle_exception([] (auto ep) { vlogger.error("Error applying local view update: {}", ep); - }); + return make_exception_future<>(std::move(ep)); + })); } else { vlogger.debug("Sending view update to endpoint {}, with pending endpoints = {}", *paired_endpoint, pending_endpoints); // Note we don't wait for the asynchronous operation to complete @@ -852,9 +868,10 @@ void mutate_MV(const dht::token& base_token, // to send the update there. Currently, we do this from *each* of // the base replicas, but this is probably excessive - see // See https://issues.apache.org/jira/browse/CASSANDRA-14262/ - service::get_local_storage_proxy().send_to_endpoint(std::move(mut), *paired_endpoint, std::move(pending_endpoints), db::write_type::VIEW).handle_exception([paired_endpoint] (auto ep) { + fs->push_back(service::get_local_storage_proxy().send_to_endpoint(std::move(mut), *paired_endpoint, std::move(pending_endpoints), db::write_type::VIEW).handle_exception([paired_endpoint] (auto ep) { vlogger.error("Error applying view update to {}: {}", *paired_endpoint, ep); - });; + return make_exception_future<>(std::move(ep)); + })); } } else { #if 0 @@ -897,8 +914,588 @@ void mutate_MV(const dht::token& base_token, viewWriteMetrics.addNano(System.nanoTime() - startTime); } #endif + auto f = seastar::when_all_succeed(fs->begin(), fs->end()); + return f.finally([fs = std::move(fs)] { }); +} + +view_builder::view_builder(database& db, db::system_distributed_keyspace& sys_dist_ks, service::migration_manager& mm) + : _db(db) + , _sys_dist_ks(sys_dist_ks) + , _mm(mm) { +} + +future<> view_builder::start() { + return seastar::async([this] { + auto built = system_keyspace::load_built_views().get0(); + auto in_progress = system_keyspace::load_view_build_progress().get0(); + calculate_shard_build_step(std::move(built), std::move(in_progress)).get(); + _mm.register_listener(this); + _current_step = _base_to_build_step.begin(); + _build_step.trigger(); + }); +} + +future<> view_builder::stop() { + vlogger.info("Stopping view builder"); + _mm.unregister_listener(this); + _as.request_abort(); + return _sem.wait().then([this] { + _sem.broken(); + return _build_step.join(); + }); +} + +static query::partition_slice make_partition_slice(const schema& s) { + query::partition_slice::option_set opts; + opts.set(query::partition_slice::option::send_partition_key); + opts.set(query::partition_slice::option::send_clustering_key); + opts.set(query::partition_slice::option::send_timestamp); + opts.set(query::partition_slice::option::send_ttl); + return query::partition_slice( + {query::full_clustering_range}, + { }, + boost::copy_range>(s.regular_columns() + | boost::adaptors::transformed(std::mem_fn(&column_definition::id))), + std::move(opts)); +} + +view_builder::build_step& view_builder::get_or_create_build_step(utils::UUID base_id) { + auto it = _base_to_build_step.find(base_id); + if (it == _base_to_build_step.end()) { + auto base = _db.find_column_family(base_id).shared_from_this(); + auto p = _base_to_build_step.emplace(base_id, build_step{base, make_partition_slice(*base->schema())}); + // Iterators could have been invalidated if there was rehashing, so just reset the cursor. + _current_step = p.first; + it = p.first; + } + return it->second; +} + +void view_builder::initialize_reader_at_current_token(build_step& step) { + step.pslice = make_partition_slice(*step.base->schema()); + step.prange = dht::partition_range(dht::ring_position::starting_at(step.current_token()), dht::ring_position::max()); + step.reader = make_local_shard_sstable_reader( + step.base->schema(), + make_lw_shared(sstables::sstable_set(step.base->get_sstable_set())), + step.prange, + step.pslice, + default_priority_class(), + no_resource_tracking(), + nullptr, + streamed_mutation::forwarding::no, + mutation_reader::forwarding::no); +} + +void view_builder::load_view_status(view_builder::view_build_status status, std::unordered_set& loaded_views) { + if (!status.next_token) { + // No progress was made on this view, so we'll treat it as new. + return; + } + vlogger.info0("Resuming to build view {}.{} at {}", status.view->ks_name(), status.view->cf_name(), *status.next_token); + loaded_views.insert(status.view->id()); + if (status.first_token == *status.next_token) { + // Completed, so nothing to do for this shard. Consider the view + // as loaded and not as a new view. + _built_views.emplace(status.view->id()); + return; + } + get_or_create_build_step(status.view->view_info()->base_id()).build_status.emplace_back(std::move(status)); +} + +void view_builder::reshard( + std::vector> view_build_status_per_shard, + std::unordered_set& loaded_views) { + // We must reshard. We aim for a simple algorithm, a step above not starting from scratch. + // Shards build entries at different paces, so both first and last tokens will differ. We + // want to be conservative when selecting the range that has been built. To do that, we + // select the intersection of all the previous shard's ranges for each view. + struct view_ptr_hash { + std::size_t operator()(const view_ptr& v) const noexcept { + return std::hash()(v->id()); + } + }; + struct view_ptr_equals { + bool operator()(const view_ptr& v1, const view_ptr& v2) const noexcept { + return v1->id() == v2->id(); + } + }; + std::unordered_map>, view_ptr_hash, view_ptr_equals> my_status; + for (auto& shard_status : view_build_status_per_shard) { + for (auto& [view, first_token, next_token] : shard_status ) { + // We start from an open-ended range, which we'll try to restrict. + auto& my_range = my_status.emplace( + std::move(view), + nonwrapping_range::make_open_ended_both_sides()).first->second; + if (!next_token || !my_range) { + // A previous shard made no progress, so for this view we'll start over. + my_range = stdx::nullopt; + continue; + } + if (first_token == *next_token) { + // Completed, so don't consider this shard's progress. We know that if the view + // is marked as in-progress, then at least one shard will have a non-full range. + continue; + } + wrapping_range other_range(first_token, *next_token); + if (other_range.is_wrap_around(dht::token_comparator())) { + // The intersection of a wrapping range with a non-wrapping range may yield more + // multiple non-contiguous ranges. To avoid the complexity of dealing with more + // than one range, we'll just take one of the intersections. + auto [bottom_range, top_range] = other_range.unwrap(); + if (auto bottom_int = my_range->intersection(nonwrapping_range(std::move(bottom_range)), dht::token_comparator())) { + my_range = std::move(bottom_int); + } else { + my_range = my_range->intersection(nonwrapping_range(std::move(top_range)), dht::token_comparator()); + } + } else { + my_range = my_range->intersection(nonwrapping_range(std::move(other_range)), dht::token_comparator()); + } + } + } + view_builder::base_to_build_step_type build_step; + for (auto& [view, opt_range] : my_status) { + if (!opt_range) { + continue; // Treat it as a new table. + } + auto start_bound = opt_range->start() ? std::move(opt_range->start()->value()) : dht::minimum_token(); + auto end_bound = opt_range->end() ? std::move(opt_range->end()->value()) : dht::minimum_token(); + auto s = view_build_status{std::move(view), std::move(start_bound), std::move(end_bound)}; + load_view_status(std::move(s), loaded_views); + } +} + +future<> view_builder::calculate_shard_build_step( + std::vector built, + std::vector in_progress) { + // Shard 0 makes cleanup changes to the system tables, but none that could conflict + // with the other shards; everyone is thus able to proceed independently. + auto bookkeeping_ops = std::make_unique>>(); + auto base_table_exists = [&, this] (const view_ptr& view) { + // This is a safety check in case this node missed a create MV statement + // but got a drop table for the base, and another node didn't get the + // drop notification and sent us the view schema. + try { + _db.find_schema(view->view_info()->base_id()); + return true; + } catch (const no_such_column_family&) { + return false; + } + }; + auto maybe_fetch_view = [&, this] (system_keyspace::view_name& name) { + try { + auto s = _db.find_schema(name.first, name.second); + if (s->is_view()) { + auto view = view_ptr(std::move(s)); + if (base_table_exists(view)) { + return view; + } + } + // The view was dropped and a table was re-created with the same name, + // but the write to the view-related system tables didn't make it. + } catch (const no_such_column_family&) { + // Fall-through + } + if (engine().cpu_id() == 0) { + bookkeeping_ops->push_back(_sys_dist_ks.remove_view(name.first, name.second)); + bookkeeping_ops->push_back(system_keyspace::remove_built_view(name.first, name.second)); + bookkeeping_ops->push_back( + system_keyspace::remove_view_build_progress_across_all_shards( + std::move(name.first), + std::move(name.second))); + } + return view_ptr(nullptr); + }; + + auto built_views = boost::copy_range>(built + | boost::adaptors::transformed(maybe_fetch_view) + | boost::adaptors::filtered([] (const view_ptr& v) { return bool(v); }) + | boost::adaptors::transformed([] (const view_ptr& v) { return v->id(); })); + + std::vector> view_build_status_per_shard; + for (auto& [view_name, first_token, next_token_opt, cpu_id] : in_progress) { + if (auto view = maybe_fetch_view(view_name)) { + if (built_views.find(view->id()) != built_views.end()) { + if (engine().cpu_id() == 0) { + auto f = _sys_dist_ks.finish_view_build(std::move(view_name.first), std::move(view_name.second)).then([view = std::move(view)] { + system_keyspace::remove_view_build_progress_across_all_shards(view->cf_name(), view->ks_name()); + }); + bookkeeping_ops->push_back(std::move(f)); + } + continue; + } + view_build_status_per_shard.resize(std::max(view_build_status_per_shard.size(), size_t(cpu_id + 1))); + view_build_status_per_shard[cpu_id].emplace_back(view_build_status{ + std::move(view), + std::move(first_token), + std::move(next_token_opt)}); + } + } + + std::unordered_set loaded_views; + if (view_build_status_per_shard.size() != smp::count) { + reshard(std::move(view_build_status_per_shard), loaded_views); + } else if (!view_build_status_per_shard.empty()) { + for (auto& status : view_build_status_per_shard[engine().cpu_id()]) { + load_view_status(std::move(status), loaded_views); + } + } + + for (auto& [_, build_step] : _base_to_build_step) { + boost::sort(build_step.build_status, [] (view_build_status s1, view_build_status s2) { + return *s1.next_token < *s2.next_token; + }); + if (!build_step.build_status.empty()) { + build_step.current_key = dht::decorated_key{*build_step.build_status.front().next_token, partition_key::make_empty()}; + } + } + + auto all_views = _db.get_views(); + auto is_new = [&] (const view_ptr& v) { + return base_table_exists(v) && loaded_views.find(v->id()) == loaded_views.end() + && built_views.find(v->id()) == built_views.end(); + }; + for (auto&& view : all_views | boost::adaptors::filtered(is_new)) { + bookkeeping_ops->push_back(add_new_view(view, get_or_create_build_step(view->view_info()->base_id()))); + } + + for (auto& [_, build_step] : _base_to_build_step) { + initialize_reader_at_current_token(build_step); + } + + auto f = seastar::when_all_succeed(bookkeeping_ops->begin(), bookkeeping_ops->end()); + return f.handle_exception([bookkeeping_ops = std::move(bookkeeping_ops)] (std::exception_ptr ep) { + vlogger.error("Failed to update materialized view bookkeeping ({}), continuing anyway.", ep); + }); +} + +future<> view_builder::add_new_view(view_ptr view, build_step& step) { + vlogger.info0("Building view {}.{}, starting at token {}", view->ks_name(), view->cf_name(), step.current_token()); + step.build_status.emplace(step.build_status.begin(), view_build_status{view, step.current_token(), std::nullopt}); + return when_all_succeed( + system_keyspace::register_view_for_building(view->ks_name(), view->cf_name(), step.current_token()), + _sys_dist_ks.start_view_build(view->ks_name(), view->cf_name())); +} + +static future<> flush_base(lw_shared_ptr base, abort_source& as) { + struct empty_state { }; + return exponential_backoff_retry::do_until_value(1s, 1min, as, [base = std::move(base)] { + return base->flush().then_wrapped([base] (future<> f) -> stdx::optional { + if (f.failed()) { + vlogger.error("Error flushing base table {}.{}: {}; retrying", base->schema()->ks_name(), base->schema()->cf_name(), f.get_exception()); + return { }; + } + return { empty_state{} }; + }); + }).discard_result(); +} + +void view_builder::on_create_view(const sstring& ks_name, const sstring& view_name) { + with_semaphore(_sem, 1, [ks_name, view_name, this] { + auto view = view_ptr(_db.find_schema(ks_name, view_name)); + auto& step = get_or_create_build_step(view->view_info()->base_id()); + return step.base->await_pending_writes().then([this, &step] { + return flush_base(step.base, _as); + }).then([this, view, &step] () mutable { + // This resets the build step to the current token. It may result in views currently + // being built to receive duplicate updates, but it simplifies things as we don't have + // to keep around a list of new views to build the next time the reader crosses a token + // threshold. + initialize_reader_at_current_token(step); + return add_new_view(view, step).then_wrapped([this, view] (future<>&& f) { + if (f.failed()) { + vlogger.error("Error setting up view for building {}.{}: {}", view->ks_name(), view->cf_name(), f.get_exception()); + } + _build_step.trigger(); + }); + }); + }).handle_exception_type([] (no_such_column_family&) { }); +} + +void view_builder::on_update_view(const sstring& ks_name, const sstring& view_name, bool) { + with_semaphore(_sem, 1, [ks_name, view_name, this] { + auto view = view_ptr(_db.find_schema(ks_name, view_name)); + auto step_it = _base_to_build_step.find(view->view_info()->base_id()); + if (step_it == _base_to_build_step.end()) { + return;// In case all the views for this CF have finished building already. + } + auto status_it = boost::find_if(step_it->second.build_status, [view] (const view_build_status& bs) { + return bs.view->id() == view->id(); + }); + if (status_it != step_it->second.build_status.end()) { + status_it->view = std::move(view); + } + }).handle_exception_type([] (no_such_column_family&) { }); +} + +void view_builder::on_drop_view(const sstring& ks_name, const sstring& view_name) { + vlogger.info0("Stopping to build view {}.{}", ks_name, view_name); + with_semaphore(_sem, 1, [ks_name, view_name, this] { + // The view is absent from the database at this point, so find it by brute force. + ([&, this] { + for (auto& [_, step] : _base_to_build_step) { + if (step.build_status.empty() || step.build_status.front().view->ks_name() != ks_name) { + continue; + } + for (auto it = step.build_status.begin(); it != step.build_status.end(); ++it) { + if (it->view->cf_name() == view_name) { + _built_views.erase(it->view->id()); + step.build_status.erase(it); + return; + } + } + } + })(); + if (engine().cpu_id() != 0) { + return make_ready_future(); + } + return when_all_succeed( + system_keyspace::remove_view_build_progress_across_all_shards(ks_name, view_name), + system_keyspace::remove_built_view(ks_name, view_name), + _sys_dist_ks.remove_view(ks_name, view_name)).handle_exception([ks_name, view_name] (std::exception_ptr ep) { + vlogger.warn("Failed to cleanup view {}.{}: {}", ks_name, view_name, ep); + }); + }); +} + +future<> view_builder::do_build_step() { + return seastar::async([this] { + exponential_backoff_retry r(1s, 1min); + while (!_base_to_build_step.empty() && !_as.abort_requested()) { + auto units = get_units(_sem, 1).get0(); + try { + execute(_current_step->second, exponential_backoff_retry(1s, 1min)); + r.reset(); + } catch (const abort_requested_exception&) { + return; + } catch (...) { + auto base = _current_step->second.base->schema(); + vlogger.warn("Error executing build step for base {}.{}: {}", base->ks_name(), base->cf_name(), std::current_exception()); + r.retry(_as).get(); + initialize_reader_at_current_token(_current_step->second); + } + if (_current_step->second.build_status.empty()) { + _current_step = _base_to_build_step.erase(_current_step); + } else { + ++_current_step; + } + if (_current_step == _base_to_build_step.end()) { + _current_step = _base_to_build_step.begin(); + } + } + }); +} + +// Called in the context of a seastar::thread. +class view_builder::consumer { +public: + struct built_views { + build_step& step; + std::vector views; + + built_views(build_step& step) + : step(step) { + } + + built_views(built_views&& other) + : step(other.step) + , views(std::move(other.views)) { + } + + ~built_views() { + for (auto&& status : views) { + std::cout << "putting " << status.view->cf_name() << " back\n"; + // Use step.current_token(), which may have wrapped around and become < first_token. + step.build_status.emplace_back(view_build_status{std::move(status.view), step.current_token(), step.current_token()}); + } + } + + void release() { + views.clear(); + } + }; + +private: + view_builder& _builder; + build_step& _step; + built_views _built_views; + std::vector _views_to_build; + std::deque _fragments; + +public: + consumer(view_builder& builder, build_step& step) + : _builder(builder) + , _step(step) + , _built_views{step} { + if (!step.current_key.key().is_empty(*_step.reader.schema())) { + load_views_to_build(); + } + } + + void load_views_to_build() { + for (auto&& vs : _step.build_status) { + if (_step.current_token() >= vs.next_token) { + if (partition_key_matches(*_step.reader.schema(), *vs.view->view_info(), _step.current_key)) { + _views_to_build.push_back(vs.view); + } + if (vs.next_token || _step.current_token() != vs.first_token) { + vs.next_token = _step.current_key.token(); + } + } else { + break; + } + } + } + + void check_for_built_views() { + for (auto it = _step.build_status.begin(); it != _step.build_status.end();) { + // A view starts being built at token t1. Due to resharding, that may not necessarily be a + // shard-owned token. We finish building the view when the next_token to build is just before + // (or at) the first token, but the shard-owned current token is after (or at) the first token. + // In the system tables, we set first_token = next_token to signal the completion of the build + // process in case of a restart. + if (it->next_token && *it->next_token <= it->first_token && _step.current_token() >= it->first_token) { + _built_views.views.push_back(std::move(*it)); + it = _step.build_status.erase(it); + } else { + ++it; + } + } + } + + stop_iteration consume_new_partition(const dht::decorated_key& dk) { + _step.current_key = std::move(dk); + check_for_built_views(); + _views_to_build.clear(); + load_views_to_build(); + return stop_iteration(_views_to_build.empty()); + } + + stop_iteration consume(tombstone) { + return stop_iteration::no; + } + + stop_iteration consume(static_row&&, tombstone, bool) { + return stop_iteration::no; + } + + stop_iteration consume(clustering_row&& cr, row_tombstone, bool) { + if (_views_to_build.empty() || _builder._as.abort_requested()) { + return stop_iteration::yes; + } + + _fragments.push_back(std::move(cr)); + return stop_iteration::no; + } + + stop_iteration consume(range_tombstone&&) { + return stop_iteration::no; + } + + stop_iteration consume_end_of_partition() { + _builder._as.check(); + if (!_fragments.empty()) { + _fragments.push_front(partition_start(_step.current_key, tombstone())); + _step.base->populate_views( + _views_to_build, + _step.current_token(), + make_flat_mutation_reader_from_fragments(_step.base->schema(), std::move(_fragments))).get(); + _fragments.clear(); + } + return stop_iteration(_step.build_status.empty()); + } + + built_views consume_end_of_stream() { + if (vlogger.is_enabled(log_level::debug)) { + auto view_names = boost::copy_range>( + _views_to_build | boost::adaptors::transformed([](auto v) { + return v->cf_name(); + })); + vlogger.debug("Completed build step for base {}.{}, at token {}; views={}", _step.base->schema()->ks_name(), + _step.base->schema()->cf_name(), _step.current_token(), view_names); + } + if (_step.reader.is_end_of_stream() && _step.reader.is_buffer_empty()) { + _step.current_key = {dht::minimum_token(), partition_key::make_empty()}; + for (auto&& vs : _step.build_status) { + vs.next_token = dht::minimum_token(); + } + _builder.initialize_reader_at_current_token(_step); + check_for_built_views(); + } + return std::move(_built_views); + } +}; + +// Called in the context of a seastar::thread. +void view_builder::execute(build_step& step, exponential_backoff_retry r) { + auto consumer = compact_for_query( + *step.reader.schema(), + gc_clock::now(), + step.pslice, + batch_size, + query::max_partitions, + view_builder::consumer{*this, step}); + consumer.consume_new_partition(step.current_key); // Initialize the state in case we're resuming a partition + auto built = step.reader.consume_in_thread(std::move(consumer)); + + _as.check(); + + std::vector> bookkeeping_ops; + bookkeeping_ops.reserve(built.views.size() + step.build_status.size()); + for (auto& [view, first_token, _] : built.views) { + bookkeeping_ops.push_back(maybe_mark_view_as_built(view, first_token)); + } + built.release(); + for (auto& [view, _, next_token] : step.build_status) { + if (next_token) { + bookkeeping_ops.push_back( + system_keyspace::update_view_build_progress(view->ks_name(), view->cf_name(), *next_token)); + } + } + seastar::when_all_succeed(bookkeeping_ops.begin(), bookkeeping_ops.end()).handle_exception([] (std::exception_ptr ep) { + vlogger.error("Failed to update materialized view bookkeeping ({}), continuing anyway.", ep); + }).get(); +} + +future<> view_builder::maybe_mark_view_as_built(view_ptr view, dht::token next_token) { + _built_views.emplace(view->id()); + vlogger.debug("Shard finished building view {}.{}", view->ks_name(), view->cf_name()); + return container().map_reduce0( + [view_id = view->id()] (view_builder& builder) { + return builder._built_views.count(view_id); + }, + true, + [] (bool result, bool shard_complete) { + return result & shard_complete; + }).then([this, view, next_token = std::move(next_token)] (bool built) { + if (built) { + return container().invoke_on_all([view_id = view->id()] (view_builder& builder) { + if (builder._built_views.erase(view_id) == 0 || engine().cpu_id() != 0) { + return make_ready_future<>(); + } + auto view = builder._db.find_schema(view_id); + vlogger.info("Finished building view {}.{}", view->ks_name(), view->cf_name()); + return seastar::when_all_succeed( + system_keyspace::mark_view_as_built(view->ks_name(), view->cf_name()), + builder._sys_dist_ks.finish_view_build(view->ks_name(), view->cf_name())).then([view] { + return system_keyspace::remove_view_build_progress_across_all_shards(view->ks_name(), view->cf_name()); + }).then([&builder, view] { + auto it = builder._build_notifiers.find(std::pair(view->ks_name(), view->cf_name())); + if (it != builder._build_notifiers.end()) { + it->second.set_value(); + } + }); + }); + } + return system_keyspace::update_view_build_progress(view->ks_name(), view->cf_name(), next_token); + }); +} + +future<> view_builder::wait_until_built(const sstring& ks_name, const sstring& view_name, lowres_clock::time_point timeout) { + return container().invoke_on(0, [ks_name, view_name, timeout] (view_builder& builder) { + auto v = std::pair(std::move(ks_name), std::move(view_name)); + return builder._build_notifiers[std::move(v)].get_shared_future(timeout); + }); } } // namespace view } // namespace db - diff --git a/db/view/view.hh b/db/view/view.hh index 3f21fb21cb..92bdef11a8 100644 --- a/db/view/view.hh +++ b/db/view/view.hh @@ -92,7 +92,7 @@ query::clustering_row_ranges calculate_affected_clustering_ranges( const mutation_partition& mp, const std::vector& views); -void mutate_MV(const dht::token& base_token, +future<> mutate_MV(const dht::token& base_token, std::vector mutations); } diff --git a/db/view/view_builder.hh b/db/view/view_builder.hh new file mode 100644 index 0000000000..18d7780327 --- /dev/null +++ b/db/view/view_builder.hh @@ -0,0 +1,196 @@ +/* + * Copyright (C) 2018 ScyllaDB + */ + +/* + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Scylla. If not, see . + */ + +#pragma once + +#include "database_fwd.hh" +#include "db/system_keyspace.hh" +#include "db/system_distributed_keyspace.hh" +#include "dht/i_partitioner.hh" +#include "keys.hh" +#include "query-request.hh" +#include "service/migration_listener.hh" +#include "service/migration_manager.hh" +#include "sstables/sstable_set.hh" +#include "utils/exponential_backoff_retry.hh" +#include "utils/serialized_action.hh" +#include "utils/UUID.hh" + +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +namespace db::view { + +/** + * The view_builder is a sharded service responsible for building all defined materialized views. + * This process entails walking over the existing data in a given base table, and using it to + * calculate and insert the respective entries for one or more views. + * + * We employ a flat_mutation_reader for each base table for which we're building views. + * + * We aim to be resource-conscious. On a given shard, at any given moment, we consume at most + * from one reader. We also strive for fairness, in that each build step inserts entries for + * the views of a different base. Each build step reads and generates updates for batch_size rows. + * + * We lack a controller, which could potentially allow us to go faster (to execute multiple steps at + * the same time, or consume more rows per batch), and also which would apply backpressure, so we + * could, for example, delay executing a build step. + * + * View building is necessarily a sharded process. That means that on restart, if the number of shards + * has changed, we need to calculate the most conservative token range that has been built, and build + * the remainder. + * + * Interaction with the system tables: + * - When we start building a view, we add an entry to the scylla_views_builds_in_progress + * system table. If the node restarts at this point, we'll consider these newly inserted + * views as having made no progress, and we'll treat them as new views; + * - When we finish a build step, we update the progress of the views that we built during + * this step by writing the next token to the scylla_views_builds_in_progress table. If + * the node restarts here, we'll start building the views at the token in the next_token column. + * - When we finish building a view, we mark it as completed in the built views system table, and + * remove it from the in-progress system table. Under failure, the following can happen: + * * When we fail to mark the view as built, we'll redo the last step upon node reboot; + * * When we fail to delete the in-progress record, upon reboot we'll remove this record. + * A view is marked as completed only when all shards have finished their share of the work, that is, + * if a view is not built, then all shards will still have an entry in the in-progress system table, + * - A view that a shard finished building, but not all other shards, remains in the in-progress system + * table, with first_token == next_token. + * Interaction with the distributed system table (view_build_status): + * - When we start building a view, we mark the view build as being in-progress; + * - When we finish building a view, we mark the view as being built. Upon failure, + * we ensure that if the view is in the in-progress system table, then it may not + * have been written to this table. We don't load the built views from this table + * when starting. When starting, the following happens: + * * If the view is in the system.built_views table and not the in-progress + * system table, then it will be in view_build_status; + * * If the view is in the system.built_views table and not in this one, it + * will still be in the in-progress system table - we detect this and mark + * it as built in this table too, keeping the invariant; + * * If the view is in this table but not in system.built_views, then it will + * also be in the in-progress system table - we don't detect this and will + * redo the missing step, for simplicity. + */ +class view_builder final : public service::migration_listener::only_view_notifications, public seastar::peering_sharded_service { + /** + * Keeps track of the build progress for a particular view. + * When the view is built, next_token == first_token. + */ + struct view_build_status final { + view_ptr view; + dht::token first_token; + std::optional next_token; + }; + + /** + * Keeps track of the build progress for all the views of a particular + * base table. Each execution of the build step comprises a query of + * the base table for the selected range. + * + * We pin the set of sstables that potentially contain data that should be added to a + * view (they are pinned by the flat_mutation_reader). Adding a view v' overwrites the + * set of pinned sstables, regardless of there being another view v'' being built. The + * new set will potentially contain new data already in v'', written as part of the write + * path. We assume this case is rare and optimize for fewer disk space in detriment of + * network bandwidth. + */ + struct build_step final { + // Ensure we pin the column_family. It may happen that all views are removed, + // and that the base table is too before we can detect it. + lw_shared_ptr base; + query::partition_slice pslice; + dht::partition_range prange; + flat_mutation_reader reader{nullptr}; + dht::decorated_key current_key{dht::minimum_token(), partition_key::make_empty()}; + std::vector build_status; + + const dht::token& current_token() const { + return current_key.token(); + } + }; + + using base_to_build_step_type = std::unordered_map; + + database& _db; + db::system_distributed_keyspace& _sys_dist_ks; + service::migration_manager& _mm; + base_to_build_step_type _base_to_build_step; + base_to_build_step_type::iterator _current_step = _base_to_build_step.end(); + serialized_action _build_step{std::bind(&view_builder::do_build_step, this)}; + // Ensures bookkeeping operations are serialized, meaning that while we execute + // a build step we don't consider newly added or removed views. This simplifies + // the algorithms. Also synchronizes an operation wrt. a call to stop(). + seastar::semaphore _sem{1}; + seastar::abort_source _as; + // Used to coordinate between shards the conclusion of the build process for a particular view. + std::unordered_set _built_views; + // Used for testing. + std::unordered_map, seastar::shared_promise<>, utils::tuple_hash> _build_notifiers; + +public: + static constexpr size_t batch_size = 128; + +public: + view_builder(database&, db::system_distributed_keyspace&, service::migration_manager&); + view_builder(view_builder&&) = delete; + + /** + * Loads the state stored in the system tables to resume building the existing views. + * Requires that all views have been loaded from the system tables and are accessible + * through the database, and that the commitlog has been replayed. + */ + future<> start(); + + /** + * Stops the view building process. + */ + future<> stop(); + + virtual void on_create_view(const sstring& ks_name, const sstring& view_name) override; + virtual void on_update_view(const sstring& ks_name, const sstring& view_name, bool columns_changed) override; + virtual void on_drop_view(const sstring& ks_name, const sstring& view_name) override; + + // For tests + future<> wait_until_built(const sstring& ks_name, const sstring& view_name, lowres_clock::time_point timeout); + +private: + build_step& get_or_create_build_step(utils::UUID); + void initialize_reader_at_current_token(build_step&); + void load_view_status(view_build_status, std::unordered_set&); + void reshard(std::vector>, std::unordered_set&); + future<> calculate_shard_build_step(std::vector, std::vector); + future<> add_new_view(view_ptr, build_step&); + future<> do_build_step(); + void execute(build_step&, exponential_backoff_retry); + future<> maybe_mark_view_as_built(view_ptr, dht::token); + + struct consumer; +}; + +} \ No newline at end of file diff --git a/flat_mutation_reader.cc b/flat_mutation_reader.cc index 2d30b4d411..67b648ec80 100644 --- a/flat_mutation_reader.cc +++ b/flat_mutation_reader.cc @@ -621,3 +621,37 @@ make_flat_multi_range_reader(schema_ptr s, mutation_source source, const dht::pa return make_flat_mutation_reader(std::move(s), std::move(source), ranges, slice, pc, std::move(trace_state), fwd_mr); } + +flat_mutation_reader +make_flat_mutation_reader_from_fragments(schema_ptr schema, std::deque fragments) { + class reader : public flat_mutation_reader::impl { + std::deque _fragments; + public: + reader(schema_ptr schema, std::deque fragments) + : flat_mutation_reader::impl(std::move(schema)) + , _fragments(std::move(fragments)) { + } + virtual future<> fill_buffer(db::timeout_clock::time_point) override { + while (!(_end_of_stream = _fragments.empty()) && !is_buffer_full()) { + push_mutation_fragment(std::move(_fragments.front())); + _fragments.pop_front(); + } + return make_ready_future<>(); + } + virtual void next_partition() override { + clear_buffer_to_next_partition(); + if (is_buffer_empty()) { + while (!(_end_of_stream = _fragments.empty()) && !_fragments.front().is_partition_start()) { + _fragments.pop_front(); + } + } + } + virtual future<> fast_forward_to(position_range pr, db::timeout_clock::time_point timeout) override { + throw std::runtime_error("This reader can't be fast forwarded to another range."); + } + virtual future<> fast_forward_to(const dht::partition_range& pr, db::timeout_clock::time_point timeout) override { + throw std::runtime_error("This reader can't be fast forwarded to another position."); + } + }; + return make_flat_mutation_reader(std::move(schema), std::move(fragments)); +} diff --git a/flat_mutation_reader.hh b/flat_mutation_reader.hh index 3aafedf71a..fc62086e2a 100644 --- a/flat_mutation_reader.hh +++ b/flat_mutation_reader.hh @@ -33,6 +33,8 @@ #include #include "db/timeout_clock.hh" +#include + using seastar::future; class mutation_source; @@ -557,6 +559,9 @@ make_flat_multi_range_reader(schema_ptr s, mutation_source source, const dht::pa tracing::trace_state_ptr trace_state = nullptr, flat_mutation_reader::partition_range_forwarding fwd_mr = flat_mutation_reader::partition_range_forwarding::yes); +flat_mutation_reader +make_flat_mutation_reader_from_fragments(schema_ptr, std::deque); + // Calls the consumer for each element of the reader's stream until end of stream // is reached or the consumer requests iteration to stop by returning stop_iteration::yes. // The consumer should accept mutation as the argument and return stop_iteration. diff --git a/init.cc b/init.cc index 6f5d9b993f..dbd72eb1bb 100644 --- a/init.cc +++ b/init.cc @@ -34,8 +34,8 @@ logging::logger startlog("init"); // duplicated in cql_test_env.cc // until proper shutdown is done. -void init_storage_service(distributed& db, sharded& auth_service) { - service::init_storage_service(db, auth_service).get(); +void init_storage_service(distributed& db, sharded& auth_service, sharded& sys_dist_ks) { + service::init_storage_service(db, auth_service, sys_dist_ks).get(); // #293 - do not stop anything //engine().at_exit([] { return service::deinit_storage_service(); }); } diff --git a/init.hh b/init.hh index 61b6728b57..c8f9eb960b 100644 --- a/init.hh +++ b/init.hh @@ -25,6 +25,7 @@ #include #include "auth/service.hh" #include "db/config.hh" +#include "db/system_distributed_keyspace.hh" #include "database.hh" #include "log.hh" @@ -36,7 +37,7 @@ extern logging::logger startlog; class bad_configuration_error : public std::exception {}; -void init_storage_service(distributed& db, sharded&); +void init_storage_service(distributed& db, sharded&, sharded&); void init_ms_fd_gossiper(sstring listen_address , uint16_t storage_port , uint16_t ssl_storage_port diff --git a/keys.hh b/keys.hh index 81f112c7e1..9fce736d59 100644 --- a/keys.hh +++ b/keys.hh @@ -304,6 +304,10 @@ public: return get_compound_type(s)->end(_bytes); } + bool is_empty(const schema& s) const { + return begin(s) == end(s); + } + // Returns a range of bytes_view auto components() const { return TopLevelView::compound::element_type::components(representation()); diff --git a/main.cc b/main.cc index 2cc82da107..f32ddac636 100644 --- a/main.cc +++ b/main.cc @@ -35,10 +35,12 @@ #include "service/load_broadcaster.hh" #include "streaming/stream_session.hh" #include "db/system_keyspace.hh" +#include "db/system_distributed_keyspace.hh" #include "db/batchlog_manager.hh" #include "db/commitlog/commitlog.hh" #include "db/hints/manager.hh" #include "db/commitlog/commitlog_replayer.hh" +#include "db/view/view_builder.hh" #include "utils/runtime.hh" #include "utils/file_lock.hh" #include "log.hh" @@ -457,9 +459,11 @@ int main(int ac, char** av) { ctx.http_server.listen(ipv4_addr{ip, api_port}).get(); startlog.info("Scylla API server listening on {}:{} ...", api_address, api_port); static sharded auth_service; + static sharded sys_dist_ks; supervisor::notify("initializing storage service"); - init_storage_service(db, auth_service); + init_storage_service(db, auth_service, sys_dist_ks); supervisor::notify("starting per-shard database core"); + // Note: changed from using a move here, because we want the config object intact. database_config dbcfg; auto make_sched_group = [&] (sstring name, unsigned shares) { @@ -614,7 +618,7 @@ int main(int ac, char** av) { } // If the same sstable is shared by several shards, it cannot be // deleted until all shards decide to compact it. So we want to - // start thse compactions now. Note we start compacting only after + // start these compactions now. Note we start compacting only after // all sstables in this CF were loaded on all shards - otherwise // we will have races between the compaction and loading processes // We also want to trigger regular compaction on boot. @@ -688,6 +692,11 @@ int main(int ac, char** av) { proxy.invoke_on_all([] (service::storage_proxy& local_proxy) { local_proxy.start_hints_manager(gms::get_local_gossiper().shared_from_this()); }).get(); } + static sharded view_builder; + supervisor::notify("starting the view builder"); + view_builder.start(std::ref(db), std::ref(sys_dist_ks), std::ref(mm)).get(); + view_builder.invoke_on_all(&db::view::view_builder::start).get(); + supervisor::notify("starting native transport"); service::get_local_storage_service().start_native_transport().get(); if (start_thrift) { @@ -718,6 +727,10 @@ int main(int ac, char** av) { return service::get_local_storage_service().drain_on_shutdown(); }); + engine().at_exit([] { + return view_builder.stop(); + }); + engine().at_exit([&db] { return db.invoke_on_all([](auto& db) { return db.get_compaction_manager().stop(); diff --git a/service/migration_listener.hh b/service/migration_listener.hh index 1d201ba230..945a0b937f 100644 --- a/service/migration_listener.hh +++ b/service/migration_listener.hh @@ -73,6 +73,26 @@ public: virtual void on_drop_function(const sstring& ks_name, const sstring& function_name) = 0; virtual void on_drop_aggregate(const sstring& ks_name, const sstring& aggregate_name) = 0; virtual void on_drop_view(const sstring& ks_name, const sstring& view_name) = 0; + + class only_view_notifications; +}; + +class migration_listener::only_view_notifications : public migration_listener { + virtual void on_create_keyspace(const sstring& ks_name) { } + virtual void on_create_column_family(const sstring& ks_name, const sstring& cf_name) { } + virtual void on_create_user_type(const sstring& ks_name, const sstring& type_name) { } + virtual void on_create_function(const sstring& ks_name, const sstring& function_name) { } + virtual void on_create_aggregate(const sstring& ks_name, const sstring& aggregate_name) { } + virtual void on_update_keyspace(const sstring& ks_name) { } + virtual void on_update_column_family(const sstring& ks_name, const sstring& cf_name, bool columns_changed) { } + virtual void on_update_user_type(const sstring& ks_name, const sstring& type_name) { } + virtual void on_update_function(const sstring& ks_name, const sstring& function_name) { } + virtual void on_update_aggregate(const sstring& ks_name, const sstring& aggregate_name) { } + virtual void on_drop_keyspace(const sstring& ks_name) { } + virtual void on_drop_column_family(const sstring& ks_name, const sstring& cf_name) { } + virtual void on_drop_user_type(const sstring& ks_name, const sstring& type_name) { } + virtual void on_drop_function(const sstring& ks_name, const sstring& function_name) { } + virtual void on_drop_aggregate(const sstring& ks_name, const sstring& aggregate_name) { } }; } diff --git a/service/storage_service.cc b/service/storage_service.cc index f8cc9ac9fa..f27d29adcc 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -105,11 +105,12 @@ int get_generation_number() { return generation_number; } -storage_service::storage_service(distributed& db, sharded& auth_service) +storage_service::storage_service(distributed& db, sharded& auth_service, sharded& sys_dist_ks) : _db(db) , _auth_service(auth_service) , _replicate_action([this] { return do_replicate_to_all_cores(); }) - , _update_pending_ranges_action([this] { return do_update_pending_ranges(); }) { + , _update_pending_ranges_action([this] { return do_update_pending_ranges(); }) + , _sys_dist_ks(sys_dist_ks) { sstable_read_error.connect([this] { isolate_on_error(); }); sstable_write_error.connect([this] { isolate_on_error(); }); general_disk_error.connect([this] { isolate_on_error(); }); @@ -551,6 +552,12 @@ void storage_service::join_token_ring(int delay) { supervisor::notify("starting tracing"); tracing::tracing::start_tracing().get(); + + supervisor::notify("starting system distributed keyspace"); + _sys_dist_ks.start( + std::ref(cql3::get_query_processor()), + std::ref(service::get_migration_manager())).get(); + _sys_dist_ks.invoke_on_all(&db::system_distributed_keyspace::start).get(); } else { slogger.info("Startup complete, but write survey mode is active, not becoming an active ring member. Use JMX (StorageService->joinRing()) to finalize ring joining."); } @@ -1259,6 +1266,9 @@ future<> storage_service::drain_on_shutdown() { tracing::tracing::tracing_instance().stop().get(); slogger.info("Drain on shutdown: tracing is stopped"); + ss._sys_dist_ks.invoke_on_all(&db::system_distributed_keyspace::stop).get(); + slogger.info("Drain on shutdown: system distributed keyspace stopped"); + get_storage_proxy().invoke_on_all([] (storage_proxy& local_proxy) { return local_proxy.stop_hints_manager(); }).get(); @@ -3419,5 +3429,18 @@ storage_service::get_natural_endpoints(const sstring& keyspace, const token& pos return _db.local().find_keyspace(keyspace).get_replication_strategy().get_natural_endpoints(pos); } +future> +storage_service::view_build_statuses(sstring keyspace, sstring view_name) const { + return _sys_dist_ks.local().view_status(std::move(keyspace), std::move(view_name)).then([this] (std::unordered_map status) { + auto& endpoint_to_host_id = get_token_metadata().get_endpoint_to_host_id_map_for_reading(); + return boost::copy_range>(endpoint_to_host_id + | boost::adaptors::transformed([&status] (const std::pair& p) { + auto it = status.find(p.second); + auto s = it != status.end() ? std::move(it->second) : "UNKNOWN"; + return std::pair(p.first.to_sstring(), std::move(s)); + })); + }); +} + } // namespace service diff --git a/service/storage_service.hh b/service/storage_service.hh index 3e971fb65a..df890b9c91 100644 --- a/service/storage_service.hh +++ b/service/storage_service.hh @@ -51,6 +51,7 @@ #include "dht/token_range_endpoints.hh" #include "core/sleep.hh" #include "gms/application_state.hh" +#include "db/system_distributed_keyspace.hh" #include "core/semaphore.hh" #include "utils/fb_utilities.hh" #include "utils/serialized_action.hh" @@ -133,7 +134,7 @@ private: bool _ms_stopped = false; bool _stream_manager_stopped = false; public: - storage_service(distributed& db, sharded&); + storage_service(distributed& db, sharded&, sharded&); void isolate_on_error(); void isolate_on_commit_error(); @@ -732,6 +733,7 @@ private: future<> do_replicate_to_all_cores(); serialized_action _replicate_action; serialized_action _update_pending_ranges_action; + sharded& _sys_dist_ks; private: /** * Replicates token_metadata contents on shard0 instance to other shards. @@ -2026,6 +2028,8 @@ public: } #endif + future> view_build_statuses(sstring keyspace, sstring view_name) const; + private: /** * Seed data to the endpoints that will be responsible for it at the future @@ -2307,8 +2311,8 @@ public: } }; -inline future<> init_storage_service(distributed& db, sharded& auth_service) { - return service::get_storage_service().start(std::ref(db), std::ref(auth_service)); +inline future<> init_storage_service(distributed& db, sharded& auth_service, sharded& sys_dist_ks) { + return service::get_storage_service().start(std::ref(db), std::ref(auth_service), std::ref(sys_dist_ks)); } inline future<> deinit_storage_service() { diff --git a/test.py b/test.py index d5a88c181b..5cf316641f 100755 --- a/test.py +++ b/test.py @@ -88,6 +88,7 @@ boost_tests = [ 'counter_test', 'cell_locker_test', 'view_schema_test', + 'view_build_test', 'clustering_ranges_walker_test', 'vint_serialization_test', 'duration_test', diff --git a/tests/cql_assertions.cc b/tests/cql_assertions.cc index 9c9ba409ca..c2f68f679f 100644 --- a/tests/cql_assertions.cc +++ b/tests/cql_assertions.cc @@ -54,6 +54,15 @@ rows_assertions::is_empty() { return {*this}; } +rows_assertions +rows_assertions::is_not_empty() { + auto row_count = _rows->rs().size(); + if (row_count == 0) { + fail("Expected some rows, but was result was empty"); + } + return {*this}; +} + rows_assertions rows_assertions::with_row(std::initializer_list values) { std::vector expected_row(values); diff --git a/tests/cql_assertions.hh b/tests/cql_assertions.hh index 79c6b73ced..26192df6ef 100644 --- a/tests/cql_assertions.hh +++ b/tests/cql_assertions.hh @@ -33,6 +33,7 @@ public: rows_assertions(shared_ptr rows); rows_assertions with_size(size_t size); rows_assertions is_empty(); + rows_assertions is_not_empty(); rows_assertions with_row(std::initializer_list values); // Verifies that the result has the following rows and only that rows, in that order. diff --git a/tests/cql_test_env.cc b/tests/cql_test_env.cc index f96e217ac4..0b152d2860 100644 --- a/tests/cql_test_env.cc +++ b/tests/cql_test_env.cc @@ -39,6 +39,7 @@ #include "tmpdir.hh" #include "db/query_context.hh" #include "test_services.hh" +#include "db/view/view_builder.hh" // TODO: remove (#293) #include "message/messaging_service.hh" @@ -47,6 +48,7 @@ #include "service/storage_service.hh" #include "auth/service.hh" #include "db/system_keyspace.hh" +#include "db/system_distributed_keyspace.hh" namespace sstables { @@ -88,6 +90,7 @@ public: private: ::shared_ptr> _db; ::shared_ptr> _auth_service; + ::shared_ptr> _view_builder; lw_shared_ptr _data_dir; private: struct core_local_state { @@ -112,7 +115,13 @@ private: return ::make_shared(_core_local.local().client_state); } public: - single_node_cql_env(::shared_ptr> db, ::shared_ptr> auth_service) : _db(db), _auth_service(std::move(auth_service)) + single_node_cql_env( + ::shared_ptr> db, + ::shared_ptr> auth_service, + ::shared_ptr> view_builder) + : _db(db) + , _auth_service(std::move(auth_service)) + , _view_builder(std::move(view_builder)) { } virtual future<::shared_ptr> execute_cql(const sstring& text) override { @@ -255,6 +264,10 @@ public: return _auth_service->local(); } + virtual db::view::view_builder& local_view_builder() override { + return _view_builder->local(); + } + future<> start() { return _core_local.start(std::ref(*_auth_service)); } @@ -308,9 +321,11 @@ public: auto stop_ms = defer([&ms] { ms.stop().get(); }); auto auth_service = ::make_shared>(); + auto sys_dist_ks = seastar::sharded(); + auto stop_sys_dist_ks = defer([&sys_dist_ks] { sys_dist_ks.stop().get(); }); auto& ss = service::get_storage_service(); - ss.start(std::ref(*db), std::ref(*auth_service)).get(); + ss.start(std::ref(*db), std::ref(*auth_service), std::ref(sys_dist_ks)).get(); auto stop_storage_service = defer([&ss] { ss.stop().get(); }); db->start(std::move(*cfg), database_config()).get(); @@ -369,6 +384,13 @@ public: auth_service->stop().get(); }); + auto view_builder = ::make_shared>(); + view_builder->start(std::ref(*db), std::ref(sys_dist_ks), std::ref(mm)).get(); + view_builder->invoke_on_all(&db::view::view_builder::start).get(); + auto stop_view_builder = defer([view_builder] { + view_builder->stop().get(); + }); + // Create the testing user. try { auth::role_config config; @@ -384,7 +406,7 @@ public: // The default user may already exist if this `cql_test_env` is starting with previously populated data. } - single_node_cql_env env(db, auth_service); + single_node_cql_env env(db, auth_service, view_builder); env.start().get(); auto stop_env = defer([&env] { env.stop().get(); }); @@ -423,12 +445,13 @@ future<> do_with_cql_env_thread(std::function func) { class storage_service_for_tests::impl { distributed _db; sharded _auth_service; + sharded _sys_dist_ks; public: impl() { auto thread = seastar::thread_impl::get(); assert(thread); netw::get_messaging_service().start(gms::inet_address("127.0.0.1"), 7000, false).get(); - service::get_storage_service().start(std::ref(_db), std::ref(_auth_service)).get(); + service::get_storage_service().start(std::ref(_db), std::ref(_auth_service), std::ref(_sys_dist_ks)).get(); service::get_storage_service().invoke_on_all([] (auto& ss) { ss.enable_all_features(); }).get(); diff --git a/tests/cql_test_env.hh b/tests/cql_test_env.hh index ebe5236852..173e6c5e2d 100644 --- a/tests/cql_test_env.hh +++ b/tests/cql_test_env.hh @@ -38,6 +38,10 @@ class database; +namespace db::view { +class view_builder; +} + namespace auth { class service; } @@ -95,9 +99,28 @@ public: virtual distributed & qp() = 0; virtual auth::service& local_auth_service() = 0; + + virtual db::view::view_builder& local_view_builder() = 0; }; future<> do_with_cql_env(std::function(cql_test_env&)> func); future<> do_with_cql_env(std::function(cql_test_env&)> func, const db::config&); future<> do_with_cql_env_thread(std::function func); future<> do_with_cql_env_thread(std::function func, const db::config&); + +template +static void eventually(EventuallySucceedingFunction&& f, size_t max_attempts = 10) { + size_t attempts = 0; + while (true) { + try { + f(); + break; + } catch (...) { + if (++attempts < max_attempts) { + sleep(std::chrono::milliseconds(1 << attempts)).get0(); + } else { + throw; + } + } + } +} \ No newline at end of file diff --git a/tests/gossip.cc b/tests/gossip.cc index c42de5b6fc..8cac8a8b3c 100644 --- a/tests/gossip.cc +++ b/tests/gossip.cc @@ -22,6 +22,7 @@ #include "core/reactor.hh" #include "core/app-template.hh" +#include "db/system_distributed_keyspace.hh" #include "message/messaging_service.hh" #include "gms/failure_detector.hh" #include "gms/gossiper.hh" @@ -69,7 +70,8 @@ int main(int ac, char ** av) { utils::fb_utilities::set_broadcast_rpc_address(listen); auto vv = std::make_shared(); locator::i_endpoint_snitch::create_snitch("SimpleSnitch").then([&auth_service, &db] { - return service::init_storage_service(db, auth_service); + sharded sys_dist_ks; + return service::init_storage_service(db, auth_service, sys_dist_ks); }).then([vv, listen, config] { return netw::get_messaging_service().start(listen); }).then([config] { diff --git a/tests/gossip_test.cc b/tests/gossip_test.cc index 12fd4867c4..2def02f6d1 100644 --- a/tests/gossip_test.cc +++ b/tests/gossip_test.cc @@ -31,14 +31,16 @@ #include "service/storage_service.hh" #include "core/distributed.hh" #include "database.hh" +#include "db/system_distributed_keyspace.hh" SEASTAR_TEST_CASE(test_boot_shutdown){ return seastar::async([] { distributed db; sharded auth_service; + sharded sys_dist_ks; utils::fb_utilities::set_broadcast_address(gms::inet_address("127.0.0.1")); locator::i_endpoint_snitch::create_snitch("SimpleSnitch").get(); - service::get_storage_service().start(std::ref(db), std::ref(auth_service)).get(); + service::get_storage_service().start(std::ref(db), std::ref(auth_service), std::ref(sys_dist_ks)).get(); db.start().get(); netw::get_messaging_service().start(gms::inet_address("127.0.0.1")).get(); gms::get_failure_detector().start().get(); diff --git a/tests/view_build_test.cc b/tests/view_build_test.cc new file mode 100644 index 0000000000..1489231eb1 --- /dev/null +++ b/tests/view_build_test.cc @@ -0,0 +1,342 @@ +/* + * Copyright (C) 2018 ScyllaDB + */ + +/* + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Scylla. If not, see . + */ + +#include + +#include "database.hh" +#include "db/view/view_builder.hh" +#include "db/system_keyspace.hh" + +#include "tests/test-utils.hh" +#include "tests/cql_test_env.hh" +#include "tests/cql_assertions.hh" + +using namespace std::literals::chrono_literals; + +SEASTAR_TEST_CASE(test_builder_with_large_partition) { + return do_with_cql_env_thread([] (cql_test_env& e) { + e.execute_cql("create table cf (p int, c int, v int, primary key (p, c))").get(); + + for (auto i = 0; i < 1024; ++i) { + e.execute_cql(sprint("insert into cf (p, c, v) values (0, %d, 0)", i)).get(); + } + + auto f = e.local_view_builder().wait_until_built("ks", "vcf", lowres_clock::now() + 10s); + e.execute_cql("create materialized view vcf as select * from cf " + "where p is not null and c is not null and v is not null " + "primary key (v, c, p)").get(); + + f.get(); + auto built = db::system_keyspace::load_built_views().get0(); + BOOST_REQUIRE_EQUAL(built.size(), 1); + BOOST_REQUIRE_EQUAL(built[0].second, sstring("vcf")); + + auto msg = e.execute_cql("select count(*) from vcf where v = 0").get0(); + assert_that(msg).is_rows().with_size(1); + assert_that(msg).is_rows().with_rows({{{long_type->decompose(1024L)}}}); + }); +} + +SEASTAR_TEST_CASE(test_builder_with_multiple_partitions) { + return do_with_cql_env_thread([] (cql_test_env& e) { + e.execute_cql("create table cf (p int, c int, v int, primary key (p, c))").get(); + + for (auto i = 0; i < 1024; ++i) { + e.execute_cql(sprint("insert into cf (p, c, v) values (%d, %d, 0)", i % 5, i)).get(); + } + + auto f = e.local_view_builder().wait_until_built("ks", "vcf", lowres_clock::now() + 10s); + e.execute_cql("create materialized view vcf as select * from cf " + "where p is not null and c is not null and v is not null " + "primary key (v, c, p)").get(); + + f.get(); + auto built = db::system_keyspace::load_built_views().get0(); + BOOST_REQUIRE_EQUAL(built.size(), 1); + BOOST_REQUIRE_EQUAL(built[0].second, sstring("vcf")); + + auto msg = e.execute_cql("select count(*) from vcf where v = 0").get0(); + assert_that(msg).is_rows().with_size(1); + assert_that(msg).is_rows().with_rows({{{long_type->decompose(1024L)}}}); + }); +} + +SEASTAR_TEST_CASE(test_builder_with_multiple_partitions_of_batch_size_rows) { + return do_with_cql_env_thread([] (cql_test_env& e) { + e.execute_cql("create table cf (p int, c int, v int, primary key (p, c))").get(); + + for (auto i = 0; i < 1024; ++i) { + e.execute_cql(sprint("insert into cf (p, c, v) values (%d, %d, 0)", i % db::view::view_builder::batch_size, i)).get(); + } + + auto f = e.local_view_builder().wait_until_built("ks", "vcf", lowres_clock::now() + 10s); + e.execute_cql("create materialized view vcf as select * from cf " + "where p is not null and c is not null and v is not null " + "primary key (v, c, p)").get(); + + f.get(); + auto built = db::system_keyspace::load_built_views().get0(); + BOOST_REQUIRE_EQUAL(built.size(), 1); + BOOST_REQUIRE_EQUAL(built[0].second, sstring("vcf")); + + auto msg = e.execute_cql("select count(*) from vcf where v = 0").get0(); + assert_that(msg).is_rows().with_size(1); + assert_that(msg).is_rows().with_rows({{{long_type->decompose(1024L)}}}); + }); +} + +SEASTAR_TEST_CASE(test_builder_view_added_during_ongoing_build) { + return do_with_cql_env_thread([] (cql_test_env& e) { + e.execute_cql("create table cf (p int, c int, v int, primary key (p, c))").get(); + + for (auto i = 0; i < 5000; ++i) { + e.execute_cql(sprint("insert into cf (p, c, v) values (0, %d, 0)", i)).get(); + } + + auto f1 = e.local_view_builder().wait_until_built("ks", "vcf1", lowres_clock::now() + 60s); + auto f2 = e.local_view_builder().wait_until_built("ks", "vcf2", lowres_clock::now() + 30s); + + e.execute_cql("create materialized view vcf1 as select * from cf " + "where p is not null and c is not null and v is not null " + "primary key (v, c, p)").get(); + + sleep(1s).get(); + + e.execute_cql("create materialized view vcf2 as select * from cf " + "where p is not null and c is not null and v is not null " + "primary key (v, p, c)").get(); + + f2.get(); + f1.get(); + auto built = db::system_keyspace::load_built_views().get0(); + BOOST_REQUIRE_EQUAL(built.size(), 2); + + auto msg = e.execute_cql("select count(*) from vcf1 where v = 0").get0(); + assert_that(msg).is_rows().with_size(1); + assert_that(msg).is_rows().with_rows({{{long_type->decompose(5000L)}}}); + + msg = e.execute_cql("select count(*) from vcf2 where v = 0").get0(); + assert_that(msg).is_rows().with_size(1); + assert_that(msg).is_rows().with_rows({{{long_type->decompose(5000L)}}}); + }); +} + +std::mt19937 random_generator() { + std::random_device rd; + // In case of errors, replace the seed with a fixed value to get a deterministic run. + auto seed = rd(); + std::cout << "Random seed: " << seed << "\n"; + return std::mt19937(seed); +} + +bytes random_bytes(size_t size, std::mt19937& gen) { + bytes result(bytes::initialized_later(), size); + static thread_local std::uniform_int_distribution dist(0, std::numeric_limits::max()); + for (size_t i = 0; i < size; ++i) { + result[i] = dist(gen); + } + return result; +} + +SEASTAR_TEST_CASE(test_builder_across_tokens_with_large_partitions) { + return do_with_cql_env_thread([] (cql_test_env& e) { + auto gen = random_generator(); + + e.execute_cql("create table cf (p blob, c int, v int, primary key (p, c))").get(); + auto s = e.local_db().find_schema("ks", "cf"); + + auto make_key = [&] (auto) { return to_hex(random_bytes(128, gen)); }; + for (auto&& k : boost::irange(0, 4) | boost::adaptors::transformed(make_key)) { + for (auto i = 0; i < 1000; ++i) { + e.execute_cql(sprint("insert into cf (p, c, v) values (0x%s, %d, 0)", k, i)).get(); + } + } + + auto f1 = e.local_view_builder().wait_until_built("ks", "vcf1", lowres_clock::now() + 60s); + auto f2 = e.local_view_builder().wait_until_built("ks", "vcf2", lowres_clock::now() + 30s); + + e.execute_cql("create materialized view vcf1 as select * from cf " + "where p is not null and c is not null and v is not null " + "primary key (v, c, p)").get(); + + sleep(1s).get(); + + e.execute_cql("create materialized view vcf2 as select * from cf " + "where p is not null and c is not null and v is not null " + "primary key (v, p, c)").get(); + + f2.get(); + f1.get(); + auto built = db::system_keyspace::load_built_views().get0(); + BOOST_REQUIRE_EQUAL(built.size(), 2); + + auto msg = e.execute_cql("select count(*) from vcf1 where v = 0").get0(); + assert_that(msg).is_rows().with_size(1); + assert_that(msg).is_rows().with_rows({{{long_type->decompose(4000L)}}}); + + msg = e.execute_cql("select count(*) from vcf2 where v = 0").get0(); + assert_that(msg).is_rows().with_size(1); + assert_that(msg).is_rows().with_rows({{{long_type->decompose(4000L)}}}); + }); +} + +SEASTAR_TEST_CASE(test_builder_across_tokens_with_small_partitions) { + return do_with_cql_env_thread([] (cql_test_env& e) { + auto gen = random_generator(); + + e.execute_cql("create table cf (p blob, c int, v int, primary key (p, c))").get(); + auto s = e.local_db().find_schema("ks", "cf"); + + auto make_key = [&] (auto) { return to_hex(random_bytes(128, gen)); }; + for (auto&& k : boost::irange(0, 1000) | boost::adaptors::transformed(make_key)) { + for (auto i = 0; i < 4; ++i) { + e.execute_cql(sprint("insert into cf (p, c, v) values (0x%s, %d, 0)", k, i)).get(); + } + } + + auto f1 = e.local_view_builder().wait_until_built("ks", "vcf1", lowres_clock::now() + 60s); + auto f2 = e.local_view_builder().wait_until_built("ks", "vcf2", lowres_clock::now() + 30s); + + e.execute_cql("create materialized view vcf1 as select * from cf " + "where p is not null and c is not null and v is not null " + "primary key (v, c, p)").get(); + + sleep(1s).get(); + + e.execute_cql("create materialized view vcf2 as select * from cf " + "where p is not null and c is not null and v is not null " + "primary key (v, p, c)").get(); + + f2.get(); + f1.get(); + + auto built = db::system_keyspace::load_built_views().get0(); + BOOST_REQUIRE_EQUAL(built.size(), 2); + + auto msg = e.execute_cql("select count(*) from vcf1 where v = 0").get0(); + assert_that(msg).is_rows().with_size(1); + assert_that(msg).is_rows().with_rows({{{long_type->decompose(4000L)}}}); + + msg = e.execute_cql("select count(*) from vcf2 where v = 0").get0(); + assert_that(msg).is_rows().with_size(1); + assert_that(msg).is_rows().with_rows({{{long_type->decompose(4000L)}}}); + }); +} + +SEASTAR_TEST_CASE(test_builder_with_tombstones) { + return do_with_cql_env_thread([] (cql_test_env& e) { + e.execute_cql("create table cf (p int, c1 int, c2 int, v int, primary key (p, c1, c2))").get(); + + for (auto i = 0; i < 100; ++i) { + e.execute_cql(sprint("insert into cf (p, c1, c2, v) values (0, %d, %d, 1)", i % 2, i)).get(); + } + + e.execute_cql("delete from cf where p = 0 and c1 = 0").get(); + e.execute_cql("delete from cf where p = 0 and c1 = 1 and c2 >= 50 and c2 < 101").get(); + + auto f = e.local_view_builder().wait_until_built("ks", "vcf", lowres_clock::now() + 30s); + e.execute_cql("create materialized view vcf as select * from cf " + "where p is not null and c1 is not null and c2 is not null and v is not null " + "primary key ((v, p), c1, c2)").get(); + + f.get(); + auto built = db::system_keyspace::load_built_views().get0(); + BOOST_REQUIRE_EQUAL(built.size(), 1); + + auto msg = e.execute_cql("select * from vcf").get0(); + assert_that(msg).is_rows().with_size(25); + }); +} + +SEASTAR_TEST_CASE(test_builder_with_concurrent_writes) { + return do_with_cql_env_thread([] (cql_test_env& e) { + auto gen = random_generator(); + + e.execute_cql("create table cf (p blob, c int, v int, primary key (p, c))").get(); + + const size_t rows = 6864; + const size_t rows_per_partition = 4; + const size_t partitions = rows / rows_per_partition; + + std::unordered_set keys; + while (keys.size() != partitions) { + keys.insert(to_hex(random_bytes(128, gen))); + } + + auto half = keys.begin(); + std::advance(half, keys.size() / 2); + auto k = keys.begin(); + for (; k != half; ++k) { + for (size_t i = 0; i < rows_per_partition; ++i) { + e.execute_cql(sprint("insert into cf (p, c, v) values (0x%s, %d, 0)", *k, i)).get(); + } + } + + auto f = e.local_view_builder().wait_until_built("ks", "vcf", lowres_clock::now() + 60s); + e.execute_cql("create materialized view vcf as select * from cf " + "where p is not null and c is not null and v is not null " + "primary key (v, c, p)").get(); + + for (; k != keys.end(); ++k) { + for (size_t i = 0; i < rows_per_partition; ++i) { + e.execute_cql(sprint("insert into cf (p, c, v) values (0x%s, %d, 0)", *k, i)).get(); + } + } + + f.get(); + eventually([&] { + auto msg = e.execute_cql("select * from vcf").get0(); + assert_that(msg).is_rows().with_size(rows); + }); + }); +} + +SEASTAR_TEST_CASE(test_builder_with_concurrent_drop) { + return do_with_cql_env_thread([] (cql_test_env& e) { + auto gen = random_generator(); + + e.execute_cql("create table cf (p blob, c int, v int, primary key (p, c))").get(); + + auto make_key = [&] (auto) { return to_hex(random_bytes(128, gen)); }; + for (auto&& k : boost::irange(0, 1000) | boost::adaptors::transformed(make_key)) { + for (auto i = 0; i < 5; ++i) { + e.execute_cql(sprint("insert into cf (p, c, v) values (0x%s, %d, 0)", k, i)).get(); + } + } + + e.execute_cql("create materialized view vcf as select * from cf " + "where p is not null and c is not null and v is not null " + "primary key (v, c, p)").get(); + + e.execute_cql("drop materialized view vcf").get(); + + eventually([&] { + auto msg = e.execute_cql("select * from system.scylla_views_builds_in_progress").get0(); + assert_that(msg).is_rows().is_empty(); + msg = e.execute_cql("select * from system.built_views").get0(); + assert_that(msg).is_rows().is_empty(); + msg = e.execute_cql("select * from system.views_builds_in_progress").get0(); + assert_that(msg).is_rows().is_empty(); + msg = e.execute_cql("select * from system_distributed.view_build_status").get0(); + assert_that(msg).is_rows().is_empty(); + }); + }); +} diff --git a/tests/view_schema_test.cc b/tests/view_schema_test.cc index a4aaab989a..b9b67cd4a3 100644 --- a/tests/view_schema_test.cc +++ b/tests/view_schema_test.cc @@ -33,24 +33,6 @@ using namespace std::literals::chrono_literals; -template -static void eventually(EventuallySucceedingFunction&& f) { - constexpr unsigned max_attempts = 10; - unsigned attempts = 0; - while (true) { - try { - f(); - break; - } catch (...) { - if (++attempts < max_attempts) { - sleep(std::chrono::milliseconds(1 << attempts)).get0(); - } else { - throw; - } - } - } -} - SEASTAR_TEST_CASE(test_case_sensitivity) { return do_with_cql_env_thread([] (auto& e) { e.execute_cql("create table cf (theKey int, theClustering int, theValue int, primary key (theKey, theClustering));").get(); diff --git a/tests/virtual_reader_test.cc b/tests/virtual_reader_test.cc index dc49865ccc..6a8e00b95e 100644 --- a/tests/virtual_reader_test.cc +++ b/tests/virtual_reader_test.cc @@ -33,12 +33,14 @@ #include "tests/test_services.hh" #include "db/size_estimates_virtual_reader.hh" +#include "db/system_keyspace.hh" #include "core/future-util.hh" #include "cql3/query_processor.hh" #include "cql3/untyped_result_set.hh" +#include "dht/i_partitioner.hh" #include "transport/messages/result_message.hh" -SEASTAR_TEST_CASE(test_query_virtual_table) { +SEASTAR_TEST_CASE(test_query_size_estimates_virtual_table) { return do_with_cql_env([] (auto& e) { auto ranges = db::size_estimates::size_estimates_mutation_reader::get_local_ranges().get0(); auto start_token1 = utf8_type->to_string(ranges[3].start); @@ -215,3 +217,50 @@ SEASTAR_TEST_CASE(test_query_virtual_table) { }).discard_result(); }); } + +SEASTAR_TEST_CASE(test_query_view_built_progress_virtual_table) { + return do_with_cql_env_thread([] (cql_test_env& e) { + auto rand = [] { return dht::global_partitioner().get_random_token(); }; + auto next_token = rand(); + auto next_token_str = dht::global_partitioner().to_sstring(next_token); + db::system_keyspace::register_view_for_building("ks", "v1", rand()).get(); + db::system_keyspace::register_view_for_building("ks", "v2", rand()).get(); + db::system_keyspace::register_view_for_building("ks", "v3", rand()).get(); + db::system_keyspace::update_view_build_progress("ks", "v3", next_token).get(); + db::system_keyspace::register_view_for_building("ks", "v4", rand()).get(); + db::system_keyspace::update_view_build_progress("ks", "v4", next_token).get(); + db::system_keyspace::register_view_for_building("ks", "v5", rand()).get(); + db::system_keyspace::register_view_for_building("ks", "v6", rand()).get(); + db::system_keyspace::remove_view_build_progress_across_all_shards("ks", "v5").get(); + db::system_keyspace::remove_view_build_progress_across_all_shards("ks", "v6").get(); + auto rs = e.execute_cql("select * from system.views_builds_in_progress where keyspace_name = 'ks'").get0(); + assert_that(rs).is_rows().with_rows_ignore_order({ + { {utf8_type->decompose(sstring("ks"))}, {utf8_type->decompose(sstring("v1"))}, {int32_type->decompose(0)}, { } }, + { {utf8_type->decompose(sstring("ks"))}, {utf8_type->decompose(sstring("v2"))}, {int32_type->decompose(0)}, { } }, + { {utf8_type->decompose(sstring("ks"))}, {utf8_type->decompose(sstring("v3"))}, {int32_type->decompose(0)}, {utf8_type->decompose(next_token_str)} }, + { {utf8_type->decompose(sstring("ks"))}, {utf8_type->decompose(sstring("v4"))}, {int32_type->decompose(0)}, {utf8_type->decompose(next_token_str)} } + }); + rs = e.execute_cql("select * from system.views_builds_in_progress").get0(); + assert_that(rs).is_rows().with_size(4); + rs = e.execute_cql("select * from system.views_builds_in_progress limit 1").get0(); + assert_that(rs).is_rows().with_size(1); + rs = e.execute_cql("select * from system.views_builds_in_progress where keyspace_name = 'ks' and view_name = 'v2'").get0(); + assert_that(rs).is_rows().with_size(1); + rs = e.execute_cql("select * from system.views_builds_in_progress where keyspace_name = 'ks' and view_name > 'v2'").get0(); + assert_that(rs).is_rows().with_size(2); + rs = e.execute_cql("select * from system.views_builds_in_progress where keyspace_name = 'ks' and view_name >= 'v2'").get0(); + assert_that(rs).is_rows().with_size(3); + rs = e.execute_cql("select * from system.views_builds_in_progress where keyspace_name = 'ks' and view_name < 'v2'").get0(); + assert_that(rs).is_rows().with_size(1); + rs = e.execute_cql("select * from system.views_builds_in_progress where keyspace_name = 'ks' and view_name <= 'v2'").get0(); + assert_that(rs).is_rows().with_size(2); + rs = e.execute_cql("select * from system.views_builds_in_progress where keyspace_name = 'ks' and view_name in ('v1', 'v2', 'v3')").get0(); + assert_that(rs).is_rows().with_size(3); + rs = e.execute_cql("select * from system.views_builds_in_progress where keyspace_name = 'ks' and view_name >= 'v2' and view_name <= 'v2'").get0(); + assert_that(rs).is_rows().with_size(1); + rs = e.execute_cql("select * from system.views_builds_in_progress where keyspace_name = 'ks' and view_name >= 'v2' and view_name <= 'v3'").get0(); + assert_that(rs).is_rows().with_size(2); + rs = e.execute_cql("select * from system.views_builds_in_progress where keyspace_name = 'ks' and view_name > 'v1' and view_name < 'v2'").get0(); + assert_that(rs).is_rows().with_size(0); + }); +}