mirror of
https://github.com/scylladb/scylladb.git
synced 2026-05-31 12:06:44 +00:00
Merge "db/view: Populate views with existing base table data" from Duarte
"
This series introduces the view_builder class, a sharded service
responsible for building all defined materialized views. This process
entails walking over the existing data in a given base table, and using
it to calculate and insert the respective entries for one or more views.
The view_builder uses the migration_manager to subscribe to schema
change events, and update its bookkeeping accordingly. We prefer this
to having the database call into the view_builder, as that would
create a cyclic dependency.
We serialize changes to the views of a particular base table, such
that schema changes do not interfere with the view building process.
We employ a flat_mutation_reader for each base table for which we're
building views.
We consume from the reader associated with each base table until all
its views are built. If the reader reaches the end and there are
incomplete views, then a view was added while others were being built.
In such cases, we restart the reader to the beginning of the current
token, but not to the beginning of the token range, when the view is
added. Then, when we exhaust the reader, we simply create a new one
for the whole token range, and resume building the pending views.
We aim to be resource-conscious. On a given shard, at any given moment,
we consume at most from one reader. We also strive for fairness, in that
each build step inserts entries for the views of a different base. Each
build step reads and generates updates for batch_size rows. We lack a
controller, which could potentially allow us to go faster (to execute
multiple steps at the same time, or consume more rows per batch), and
also which would apply backpressure, so we could, for example, delay
executing a build step.
Interaction with the system tables:
- When we start building a view, we add an entry to the
scylla_views_builds_in_progress system table. If the node restarts
at this point, we'll consider these newly inserted views as having
made no progress, and we'll treat them as new views;
- When we finish a build step, we update the progress of the views
that we built during this step by writing the next token to the
scylla_views_builds_in_progress table. If the node restarts here,
we'll start building the views at the token in the next_token
column.
- When we finish building a view, we mark it as completed in the
built views system table, and remove it from the in-progress system
table. Under failure, the following can happen:
* When we fail to mark the view as built, we'll redo the last
step upon node reboot;
* When we fail to delete the in-progress record, upon reboot
we'll remove this record.
A view is marked as completed only when all shards have finished
their share of the work, that is, if a view is not built, then all
shards will still have an entry in the in-progress system table;
- A view that a shard finished building, but not all other shards,
remains in the in-progress system table, with first_token ==
next_token.
Interaction with the distributed system tables:
- When we start building a view, we mark the view build as being
in-progress;
- When we finish building a view, we mark the view as being built.
Upon failure, we ensure that if the view is in the in-progress
system table, then it may not have been written to this table. We
don't load the built views from this table when starting. When
starting, the following happens:
* If the view is in the system.built_views table and not the
in-progress system table, then it will be in this one;
* If the view is in the system.built_views table and not in
this one, it will still be in the in-progress system table -
we detect this and mark it as built in this table too,
keeping the invariant;
* If the view is in this table but not in system.built_views,
then it will also be in the in-progress system table - we
don't detect this and will redo the missing step, for
simplicity.
View building is necessarily a sharded process. That means that on
restart, if the number of shards has changed, we need to calculate
the most conservative token range that has been built, and build
the remainder.
When building view updates, we consider that everything is new and
nothing pre-existing is there (which means no tombstones will be sent
out to the paired view replicas).
Tests:
unit (debug)
dtest (materialized_view_test.py(smp=1, smp=2))
"
* 'view-building/v4' of https://github.com/duarten/scylla: (22 commits)
tests/view_build_test: Add tests for view building
tests/cql_test_env: Move eventually() to this file
tests/cql_assertions: Assert result set is not empty
tests/cql_test_env: Start the view_builder
db/view/view_builder: Allow synchronizing with the end of a build
db/view/view_builder: Actually build views
flat_mutation_reader: Make reader from mutation fragments
db/view/view_builder: React to schema changes
service/migration_listener: Add class for view notifications
db/view: Introduce view_builder
column_family: Add function to populate views
column_family: Allow synchronizing with in-progress writes
database: Compare view id instead of name in find_views()
database: Add get_views() function
db/view: Return a future when sending view updates
service/storage_service: Allow querying the view build status
db: Introduce system_distributed_keyspace
tests: Add unit test for build_progress_virtual_reader
db/system_keyspace: Add API for MV-related system tables
db/system_keyspace: Add virtual reader for MV in-progress build status
...
This commit is contained in:
@@ -2129,6 +2129,41 @@
|
||||
]
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"path":"/storage_service/view_build_statuses/{keyspace}/{view}",
|
||||
"operations":[
|
||||
{
|
||||
"method":"GET",
|
||||
"summary":"Gets the progress of a materialized view build",
|
||||
"type":"array",
|
||||
"items":{
|
||||
"type":"mapper"
|
||||
},
|
||||
"nickname":"view_build_statuses",
|
||||
"produces":[
|
||||
"application/json"
|
||||
],
|
||||
"parameters":[
|
||||
{
|
||||
"name":"keyspace",
|
||||
"description":"The keyspace",
|
||||
"required":true,
|
||||
"allowMultiple":false,
|
||||
"type":"string",
|
||||
"paramType":"path"
|
||||
},
|
||||
{
|
||||
"name":"view",
|
||||
"description":"View name",
|
||||
"required":true,
|
||||
"allowMultiple":false,
|
||||
"type":"string",
|
||||
"paramType":"path"
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
}
|
||||
],
|
||||
"models":{
|
||||
|
||||
@@ -852,6 +852,15 @@ void set_storage_service(http_context& ctx, routes& r) {
|
||||
return make_ready_future<json::json_return_type>(map_to_key_value(ownership, res));
|
||||
});
|
||||
});
|
||||
|
||||
ss::view_build_statuses.set(r, [&ctx] (std::unique_ptr<request> req) {
|
||||
auto keyspace = validate_keyspace(ctx, req->param);
|
||||
auto view = req->param["view"];
|
||||
return service::get_local_storage_service().view_build_statuses(std::move(keyspace), std::move(view)).then([] (std::unordered_map<sstring, sstring> status) {
|
||||
std::vector<storage_service_json::mapper> res;
|
||||
return make_ready_future<json::json_return_type>(map_to_key_value(std::move(status), res));
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -273,6 +273,7 @@ scylla_tests = [
|
||||
'tests/input_stream_test',
|
||||
'tests/virtual_reader_test',
|
||||
'tests/view_schema_test',
|
||||
'tests/view_build_test',
|
||||
'tests/counter_test',
|
||||
'tests/cell_locker_test',
|
||||
'tests/row_locker_test',
|
||||
@@ -491,6 +492,7 @@ scylla_core = (['database.cc',
|
||||
'cql3/variable_specifications.cc',
|
||||
'db/consistency_level.cc',
|
||||
'db/system_keyspace.cc',
|
||||
'db/system_distributed_keyspace.cc',
|
||||
'db/schema_tables.cc',
|
||||
'db/cql_type_parser.cc',
|
||||
'db/legacy_schema_migrator.cc',
|
||||
|
||||
48
database.cc
48
database.cc
@@ -2914,6 +2914,11 @@ bool database::has_schema(const sstring& ks_name, const sstring& cf_name) const
|
||||
return _ks_cf_to_uuid.count(std::make_pair(ks_name, cf_name)) > 0;
|
||||
}
|
||||
|
||||
std::vector<view_ptr> database::get_views() const {
|
||||
return boost::copy_range<std::vector<view_ptr>>(get_non_system_column_families()
|
||||
| boost::adaptors::filtered([] (auto& cf) { return cf->schema()->is_view(); })
|
||||
| boost::adaptors::transformed([] (auto& cf) { return view_ptr(cf->schema()); }));
|
||||
}
|
||||
|
||||
void database::create_in_memory_keyspace(const lw_shared_ptr<keyspace_metadata>& ksm) {
|
||||
keyspace ks(ksm, std::move(make_keyspace_config(*ksm)));
|
||||
@@ -3259,7 +3264,7 @@ future<mutation> database::do_apply_counter_update(column_family& cf, const froz
|
||||
std::move(regular_columns), { }, { }, cql_serialization_format::internal(), query::max_rows);
|
||||
|
||||
return do_with(std::move(slice), std::move(m), std::vector<locked_cell>(),
|
||||
[this, &cf, timeout, trace_state = std::move(trace_state)] (const query::partition_slice& slice, mutation& m, std::vector<locked_cell>& locks) mutable {
|
||||
[this, &cf, timeout, trace_state = std::move(trace_state), op = cf.write_in_progress()] (const query::partition_slice& slice, mutation& m, std::vector<locked_cell>& locks) mutable {
|
||||
tracing::trace(trace_state, "Acquiring counter locks");
|
||||
return cf.lock_counter_cells(m, timeout).then([&, m_schema = cf.schema(), trace_state = std::move(trace_state), timeout, this] (std::vector<locked_cell> lcs) mutable {
|
||||
locks = std::move(lcs);
|
||||
@@ -3491,16 +3496,19 @@ future<> database::do_apply(schema_ptr s, const frozen_mutation& m, db::timeout_
|
||||
throw std::runtime_error(sprint("attempted to mutate using not synced schema of %s.%s, version=%s",
|
||||
s->ks_name(), s->cf_name(), s->version()));
|
||||
}
|
||||
|
||||
// Signal to view building code that a write is in progress,
|
||||
// so it knows when new writes start being sent to a new view.
|
||||
auto op = cf.write_in_progress();
|
||||
if (cf.views().empty()) {
|
||||
return apply_with_commitlog(std::move(s), cf, std::move(uuid), m, timeout);
|
||||
return apply_with_commitlog(std::move(s), cf, std::move(uuid), m, timeout).finally([op = std::move(op)] { });
|
||||
}
|
||||
future<row_locker::lock_holder> f = cf.push_view_replica_updates(s, m);
|
||||
return f.then([this, s = std::move(s), uuid = std::move(uuid), &m, timeout] (row_locker::lock_holder lock) {
|
||||
auto& cf = find_column_family(uuid);
|
||||
return f.then([this, s = std::move(s), uuid = std::move(uuid), &m, timeout, &cf, op = std::move(op)] (row_locker::lock_holder lock) mutable {
|
||||
return apply_with_commitlog(std::move(s), cf, std::move(uuid), m, timeout).finally(
|
||||
// Hold the local lock on the base-table partition or row
|
||||
// taken before the read, until the update is done.
|
||||
[lock = std::move(lock)] { });
|
||||
[lock = std::move(lock), op = std::move(op)] { });
|
||||
});
|
||||
});
|
||||
}
|
||||
@@ -4265,9 +4273,10 @@ void column_family::set_schema(schema_ptr s) {
|
||||
|
||||
static std::vector<view_ptr>::iterator find_view(std::vector<view_ptr>& views, const view_ptr& v) {
|
||||
return std::find_if(views.begin(), views.end(), [&v] (auto&& e) {
|
||||
return e->cf_name() == v->cf_name();
|
||||
return e->id() == v->id();
|
||||
});
|
||||
}
|
||||
|
||||
void column_family::add_or_update_view(view_ptr v) {
|
||||
auto existing = find_view(_views, v);
|
||||
if (existing != _views.end()) {
|
||||
@@ -4317,7 +4326,7 @@ future<> column_family::generate_and_propagate_view_updates(const schema_ptr& ba
|
||||
std::move(views),
|
||||
flat_mutation_reader_from_mutations({std::move(m)}),
|
||||
std::move(existings)).then([base_token = std::move(base_token)] (auto&& updates) {
|
||||
db::view::mutate_MV(std::move(base_token), std::move(updates));
|
||||
db::view::mutate_MV(std::move(base_token), std::move(updates)).handle_exception([] (auto ignored) { });
|
||||
});
|
||||
}
|
||||
|
||||
@@ -4456,6 +4465,31 @@ column_family::local_base_lock(const schema_ptr& s, const dht::decorated_key& pk
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Given some updates on the base table and assuming there are no pre-existing, overlapping updates,
|
||||
* generates the mutations to be applied to the base table's views, and sends them to the paired
|
||||
* view replicas. The future resolves when the updates have been acknowledged by the repicas, i.e.,
|
||||
* propagating the view updates to the view replicas happens synchronously.
|
||||
*
|
||||
* @param views the affected views which need to be updated.
|
||||
* @param base_token The token to use to match the base replica with the paired replicas.
|
||||
* @param reader the base table updates being applied, which all correspond to the base token.
|
||||
* @return a future that resolves when the updates have been acknowledged by the view replicas
|
||||
*/
|
||||
future<> column_family::populate_views(
|
||||
std::vector<view_ptr> views,
|
||||
dht::token base_token,
|
||||
flat_mutation_reader&& reader) {
|
||||
auto& schema = reader.schema();
|
||||
return db::view::generate_view_updates(
|
||||
schema,
|
||||
std::move(views),
|
||||
std::move(reader),
|
||||
{ }).then([base_token = std::move(base_token)] (auto&& updates) {
|
||||
return db::view::mutate_MV(std::move(base_token), std::move(updates));
|
||||
});
|
||||
}
|
||||
|
||||
void column_family::set_hit_rate(gms::inet_address addr, cache_temperature rate) {
|
||||
auto& e = _cluster_cache_hit_rates[addr];
|
||||
e.rate = rate;
|
||||
|
||||
21
database.hh
21
database.hh
@@ -461,6 +461,11 @@ private:
|
||||
double _cached_percentile = -1;
|
||||
lowres_clock::time_point _percentile_cache_timestamp;
|
||||
std::chrono::milliseconds _percentile_cache_value;
|
||||
|
||||
// Phaser used to synchronize with in-progress writes. This is useful for code that,
|
||||
// after some modification, needs to ensure that news writes will see it before
|
||||
// it can proceed, such as the view building code.
|
||||
utils::phased_barrier _pending_writes_phaser;
|
||||
private:
|
||||
void update_stats_for_new_sstable(uint64_t disk_space_used_by_sstable, const std::vector<unsigned>& shards_for_the_sstable) noexcept;
|
||||
// Adds new sstable to the set of sstables
|
||||
@@ -784,6 +789,14 @@ public:
|
||||
|
||||
future<> run_with_compaction_disabled(std::function<future<> ()> func);
|
||||
|
||||
utils::phased_barrier::operation write_in_progress() {
|
||||
return _pending_writes_phaser.start();
|
||||
}
|
||||
|
||||
future<> await_pending_writes() {
|
||||
return _pending_writes_phaser.advance_and_await();
|
||||
}
|
||||
|
||||
void add_or_update_view(view_ptr v);
|
||||
void remove_view(view_ptr v);
|
||||
const std::vector<view_ptr>& views() const;
|
||||
@@ -798,6 +811,12 @@ public:
|
||||
uint64_t large_partition_warning_threshold_bytes() const {
|
||||
return _config.large_partition_warning_threshold_bytes;
|
||||
}
|
||||
|
||||
future<> populate_views(
|
||||
std::vector<view_ptr>,
|
||||
dht::token base_token,
|
||||
flat_mutation_reader&&);
|
||||
|
||||
private:
|
||||
std::vector<view_ptr> affected_views(const schema_ptr& base, const mutation& update) const;
|
||||
future<> generate_and_propagate_view_updates(const schema_ptr& base,
|
||||
@@ -1273,6 +1292,8 @@ public:
|
||||
|
||||
std::vector<lw_shared_ptr<column_family>> get_non_system_column_families() const;
|
||||
|
||||
std::vector<view_ptr> get_views() const;
|
||||
|
||||
const std::unordered_map<std::pair<sstring, sstring>, utils::UUID, utils::tuple_hash>&
|
||||
get_column_families_mapping() const {
|
||||
return _ks_cf_to_uuid;
|
||||
|
||||
143
db/system_distributed_keyspace.cc
Normal file
143
db/system_distributed_keyspace.cc
Normal file
@@ -0,0 +1,143 @@
|
||||
/*
|
||||
* Copyright (C) 2018 ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* This file is part of Scylla.
|
||||
*
|
||||
* Scylla is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Affero General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* Scylla is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include "db/system_distributed_keyspace.hh"
|
||||
|
||||
#include "cql3/untyped_result_set.hh"
|
||||
#include "database.hh"
|
||||
#include "db/consistency_level_type.hh"
|
||||
#include "db/system_keyspace.hh"
|
||||
#include "schema_builder.hh"
|
||||
#include "types.hh"
|
||||
|
||||
#include <seastar/core/reactor.hh>
|
||||
#include <seastar/core/shared_ptr.hh>
|
||||
|
||||
#include <boost/range/adaptor/transformed.hpp>
|
||||
|
||||
#include <optional>
|
||||
#include <vector>
|
||||
#include <experimental/optional>
|
||||
|
||||
namespace db {
|
||||
|
||||
schema_ptr view_build_status() {
|
||||
static thread_local auto schema = [] {
|
||||
auto id = generate_legacy_id(system_distributed_keyspace::NAME, system_distributed_keyspace::VIEW_BUILD_STATUS);
|
||||
return schema_builder(system_distributed_keyspace::NAME, system_distributed_keyspace::VIEW_BUILD_STATUS, std::experimental::make_optional(id))
|
||||
.with_column("keyspace_name", utf8_type, column_kind::partition_key)
|
||||
.with_column("view_name", utf8_type, column_kind::partition_key)
|
||||
.with_column("host_id", uuid_type, column_kind::clustering_key)
|
||||
.with_column("status", utf8_type)
|
||||
.with_version(system_keyspace::generate_schema_version(id))
|
||||
.build();
|
||||
}();
|
||||
return schema;
|
||||
}
|
||||
|
||||
static std::vector<schema_ptr> all_tables() {
|
||||
return {
|
||||
view_build_status(),
|
||||
};
|
||||
}
|
||||
|
||||
system_distributed_keyspace::system_distributed_keyspace(cql3::query_processor& qp, service::migration_manager& mm)
|
||||
: _qp(qp)
|
||||
, _mm(mm) {
|
||||
}
|
||||
|
||||
future<> system_distributed_keyspace::start() {
|
||||
if (engine().cpu_id() != 0) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
static auto ignore_existing = [] (seastar::noncopyable_function<future<>()> func) {
|
||||
return futurize_apply(std::move(func)).handle_exception_type([] (exceptions::already_exists_exception& ignored) { });
|
||||
};
|
||||
|
||||
// We use min_timestamp so that the default keyspace metadata will lose with any manual adjustments.
|
||||
// See issue #2129.
|
||||
return ignore_existing([this] {
|
||||
auto ksm = keyspace_metadata::new_keyspace(
|
||||
NAME,
|
||||
"org.apache.cassandra.locator.SimpleStrategy",
|
||||
{{"replication_factor", "3"}},
|
||||
true);
|
||||
return _mm.announce_new_keyspace(ksm, api::min_timestamp, false);
|
||||
}).then([this] {
|
||||
return do_with(all_tables(), [this] (std::vector<schema_ptr>& tables) {
|
||||
return do_for_each(tables, [this] (schema_ptr table) {
|
||||
return ignore_existing([this, table = std::move(table)] {
|
||||
return _mm.announce_new_column_family(std::move(table), false);
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
future<> system_distributed_keyspace::stop() {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
future<std::unordered_map<utils::UUID, sstring>> system_distributed_keyspace::view_status(sstring ks_name, sstring view_name) const {
|
||||
return _qp.process(
|
||||
sprint("SELECT host_id, status FROM %s.%s WHERE keyspace_name = ? AND view_name = ?", NAME, VIEW_BUILD_STATUS),
|
||||
db::consistency_level::ONE,
|
||||
{ std::move(ks_name), std::move(view_name) },
|
||||
false).then([this] (::shared_ptr<cql3::untyped_result_set> cql_result) {
|
||||
return boost::copy_range<std::unordered_map<utils::UUID, sstring>>(*cql_result
|
||||
| boost::adaptors::transformed([] (const cql3::untyped_result_set::row& row) {
|
||||
auto host_id = row.get_as<utils::UUID>("host_id");
|
||||
auto status = row.get_as<sstring>("status");
|
||||
return std::pair(std::move(host_id), std::move(status));
|
||||
}));
|
||||
});
|
||||
}
|
||||
|
||||
future<> system_distributed_keyspace::start_view_build(sstring ks_name, sstring view_name) const {
|
||||
return db::system_keyspace::get_local_host_id().then([this, ks_name = std::move(ks_name), view_name = std::move(view_name)] (utils::UUID host_id) {
|
||||
return _qp.process(
|
||||
sprint("INSERT INTO %s.%s (keyspace_name, view_name, host_id, status) VALUES (?, ?, ?, ?)", NAME, VIEW_BUILD_STATUS),
|
||||
db::consistency_level::ONE,
|
||||
{ std::move(ks_name), std::move(view_name), std::move(host_id), "STARTED" },
|
||||
false).discard_result();
|
||||
});
|
||||
}
|
||||
|
||||
future<> system_distributed_keyspace::finish_view_build(sstring ks_name, sstring view_name) const {
|
||||
return db::system_keyspace::get_local_host_id().then([this, ks_name = std::move(ks_name), view_name = std::move(view_name)] (utils::UUID host_id) {
|
||||
return _qp.process(
|
||||
sprint("UPDATE %s.%s SET status = ? WHERE keyspace_name = ? AND view_name = ? AND host_id = ?", NAME, VIEW_BUILD_STATUS),
|
||||
db::consistency_level::ONE,
|
||||
{ "SUCCESS", std::move(ks_name), std::move(view_name), std::move(host_id) },
|
||||
false).discard_result();
|
||||
});
|
||||
}
|
||||
|
||||
future<> system_distributed_keyspace::remove_view(sstring ks_name, sstring view_name) const {
|
||||
return _qp.process(
|
||||
sprint("DELETE FROM %s.%s WHERE keyspace_name = ? AND view_name = ?", NAME, VIEW_BUILD_STATUS),
|
||||
db::consistency_level::ONE,
|
||||
{ std::move(ks_name), std::move(view_name) },
|
||||
false).discard_result();
|
||||
}
|
||||
|
||||
}
|
||||
58
db/system_distributed_keyspace.hh
Normal file
58
db/system_distributed_keyspace.hh
Normal file
@@ -0,0 +1,58 @@
|
||||
/*
|
||||
* Copyright (C) 2018 ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* This file is part of Scylla.
|
||||
*
|
||||
* Scylla is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Affero General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* Scylla is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "bytes.hh"
|
||||
#include "cql3/query_processor.hh"
|
||||
#include "schema.hh"
|
||||
#include "service/migration_manager.hh"
|
||||
#include "utils/UUID.hh"
|
||||
|
||||
#include <seastar/core/future.hh>
|
||||
#include <seastar/core/sstring.hh>
|
||||
|
||||
#include <unordered_map>
|
||||
|
||||
namespace db {
|
||||
|
||||
class system_distributed_keyspace {
|
||||
public:
|
||||
static constexpr auto NAME = "system_distributed";
|
||||
static constexpr auto VIEW_BUILD_STATUS = "view_build_status";
|
||||
|
||||
private:
|
||||
cql3::query_processor& _qp;
|
||||
service::migration_manager& _mm;
|
||||
|
||||
public:
|
||||
system_distributed_keyspace(cql3::query_processor&, service::migration_manager&);
|
||||
|
||||
future<> start();
|
||||
future<> stop();
|
||||
|
||||
future<std::unordered_map<utils::UUID, sstring>> view_status(sstring ks_name, sstring view_name) const;
|
||||
future<> start_view_build(sstring ks_name, sstring view_name) const;
|
||||
future<> finish_view_build(sstring ks_name, sstring view_name) const;
|
||||
future<> remove_view(sstring ks_name, sstring view_name) const;
|
||||
};
|
||||
|
||||
}
|
||||
@@ -74,6 +74,7 @@
|
||||
#include "db/size_estimates_virtual_reader.hh"
|
||||
#include "db/timeout_clock.hh"
|
||||
#include "sstables/sstables.hh"
|
||||
#include "db/view/build_progress_virtual_reader.hh"
|
||||
#include "db/schema_tables.hh"
|
||||
|
||||
using days = std::chrono::duration<int, std::ratio<24 * 3600>>;
|
||||
@@ -642,6 +643,22 @@ schema_ptr built_views() {
|
||||
return schema;
|
||||
}
|
||||
|
||||
schema_ptr scylla_views_builds_in_progress() {
|
||||
static thread_local auto schema = [] {
|
||||
auto id = generate_legacy_id(NAME, SCYLLA_VIEWS_BUILDS_IN_PROGRESS);
|
||||
return schema_builder(NAME, SCYLLA_VIEWS_BUILDS_IN_PROGRESS, stdx::make_optional(id))
|
||||
.with_column("keyspace_name", utf8_type, column_kind::partition_key)
|
||||
.with_column("view_name", utf8_type, column_kind::clustering_key)
|
||||
.with_column("cpu_id", int32_type, column_kind::clustering_key)
|
||||
.with_column("next_token", utf8_type)
|
||||
.with_column("generation_number", int32_type)
|
||||
.with_column("first_token", utf8_type)
|
||||
.with_version(generate_schema_version(id))
|
||||
.build();
|
||||
}();
|
||||
return schema;
|
||||
}
|
||||
|
||||
} //</v3>
|
||||
|
||||
namespace legacy {
|
||||
@@ -1541,7 +1558,8 @@ std::vector<schema_ptr> all_tables() {
|
||||
r.insert(r.end(), { built_indexes(), hints(), batchlog(), paxos(), local(),
|
||||
peers(), peer_events(), range_xfers(),
|
||||
compactions_in_progress(), compaction_history(),
|
||||
sstable_activity(), size_estimates(),
|
||||
sstable_activity(), size_estimates(), v3::views_builds_in_progress(), v3::built_views(),
|
||||
v3::scylla_views_builds_in_progress(),
|
||||
});
|
||||
// legacy schema
|
||||
r.insert(r.end(), {
|
||||
@@ -1558,6 +1576,9 @@ static void maybe_add_virtual_reader(schema_ptr s, database& db) {
|
||||
if (s.get() == size_estimates().get()) {
|
||||
db.find_column_family(s).set_virtual_reader(mutation_source(db::size_estimates::virtual_reader()));
|
||||
}
|
||||
if (s.get() == v3::views_builds_in_progress().get()) {
|
||||
db.find_column_family(s).set_virtual_reader(mutation_source(db::view::build_progress_virtual_reader(db)));
|
||||
}
|
||||
}
|
||||
|
||||
static bool maybe_write_in_user_memory(schema_ptr s, database& db) {
|
||||
@@ -1783,6 +1804,85 @@ mutation make_size_estimates_mutation(const sstring& ks, std::vector<range_estim
|
||||
return m_to_apply;
|
||||
}
|
||||
|
||||
future<> register_view_for_building(sstring ks_name, sstring view_name, const dht::token& token) {
|
||||
sstring req = sprint("INSERT INTO system.%s (keyspace_name, view_name, generation_number, cpu_id, first_token) VALUES (?, ?, ?, ?, ?)",
|
||||
v3::SCYLLA_VIEWS_BUILDS_IN_PROGRESS);
|
||||
return execute_cql(
|
||||
std::move(req),
|
||||
std::move(ks_name),
|
||||
std::move(view_name),
|
||||
0,
|
||||
int32_t(engine().cpu_id()),
|
||||
dht::global_partitioner().to_sstring(token)).discard_result();
|
||||
}
|
||||
|
||||
future<> update_view_build_progress(sstring ks_name, sstring view_name, const dht::token& token) {
|
||||
sstring req = sprint("INSERT INTO system.%s (keyspace_name, view_name, next_token, cpu_id) VALUES (?, ?, ?, ?)",
|
||||
v3::SCYLLA_VIEWS_BUILDS_IN_PROGRESS);
|
||||
return execute_cql(
|
||||
std::move(req),
|
||||
std::move(ks_name),
|
||||
std::move(view_name),
|
||||
dht::global_partitioner().to_sstring(token),
|
||||
int32_t(engine().cpu_id())).discard_result();
|
||||
}
|
||||
|
||||
future<> remove_view_build_progress_across_all_shards(sstring ks_name, sstring view_name) {
|
||||
return execute_cql(
|
||||
sprint("DELETE FROM system.%s WHERE keyspace_name = ? AND view_name = ?", v3::SCYLLA_VIEWS_BUILDS_IN_PROGRESS),
|
||||
std::move(ks_name),
|
||||
std::move(view_name)).discard_result();
|
||||
}
|
||||
|
||||
future<> mark_view_as_built(sstring ks_name, sstring view_name) {
|
||||
return execute_cql(
|
||||
sprint("INSERT INTO system.%s (keyspace_name, view_name) VALUES (?, ?)", v3::BUILT_VIEWS),
|
||||
std::move(ks_name),
|
||||
std::move(view_name)).discard_result();
|
||||
}
|
||||
|
||||
future<> remove_built_view(sstring ks_name, sstring view_name) {
|
||||
return execute_cql(
|
||||
sprint("DELETE FROM system.%s WHERE keyspace_name = ? AND view_name = ?", v3::BUILT_VIEWS),
|
||||
std::move(ks_name),
|
||||
std::move(view_name)).discard_result();
|
||||
}
|
||||
|
||||
future<std::vector<view_name>> load_built_views() {
|
||||
return execute_cql(sprint("SELECT * FROM system.%s", v3::BUILT_VIEWS)).then([] (::shared_ptr<cql3::untyped_result_set> cql_result) {
|
||||
return boost::copy_range<std::vector<view_name>>(*cql_result
|
||||
| boost::adaptors::transformed([] (const cql3::untyped_result_set::row& row) {
|
||||
auto ks_name = row.get_as<sstring>("keyspace_name");
|
||||
auto cf_name = row.get_as<sstring>("view_name");
|
||||
return std::pair(std::move(ks_name), std::move(cf_name));
|
||||
}));
|
||||
});
|
||||
}
|
||||
|
||||
future<std::vector<view_build_progress>> load_view_build_progress() {
|
||||
return execute_cql(sprint("SELECT keyspace_name, view_name, first_token, next_token, cpu_id FROM system.%s",
|
||||
v3::SCYLLA_VIEWS_BUILDS_IN_PROGRESS)).then([] (::shared_ptr<cql3::untyped_result_set> cql_result) {
|
||||
std::vector<view_build_progress> progress;
|
||||
for (auto& row : *cql_result) {
|
||||
auto ks_name = row.get_as<sstring>("keyspace_name");
|
||||
auto cf_name = row.get_as<sstring>("view_name");
|
||||
auto first_token = dht::global_partitioner().from_sstring(row.get_as<sstring>("first_token"));
|
||||
auto next_token_sstring = row.get_opt<sstring>("next_token");
|
||||
std::optional<dht::token> next_token;
|
||||
if (next_token_sstring) {
|
||||
next_token = dht::global_partitioner().from_sstring(std::move(next_token_sstring).value());
|
||||
}
|
||||
auto cpu_id = row.get_as<int32_t>("cpu_id");
|
||||
progress.emplace_back(view_build_progress{
|
||||
view_name(std::move(ks_name), std::move(cf_name)),
|
||||
std::move(first_token),
|
||||
std::move(next_token),
|
||||
static_cast<shard_id>(cpu_id)});
|
||||
}
|
||||
return progress;
|
||||
});
|
||||
}
|
||||
|
||||
} // namespace system_keyspace
|
||||
|
||||
sstring system_keyspace_name() {
|
||||
|
||||
@@ -40,8 +40,10 @@
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <optional>
|
||||
#include <unordered_map>
|
||||
#include <utility>
|
||||
#include <vector>
|
||||
#include "schema.hh"
|
||||
#include "utils/UUID.hh"
|
||||
#include "gms/inet_address.hh"
|
||||
@@ -99,6 +101,7 @@ static constexpr auto SIZE_ESTIMATES = "size_estimates";
|
||||
static constexpr auto AVAILABLE_RANGES = "available_ranges";
|
||||
static constexpr auto VIEWS_BUILDS_IN_PROGRESS = "views_builds_in_progress";
|
||||
static constexpr auto BUILT_VIEWS = "built_views";
|
||||
static constexpr auto SCYLLA_VIEWS_BUILDS_IN_PROGRESS = "scylla_views_builds_in_progress";
|
||||
}
|
||||
|
||||
namespace legacy {
|
||||
@@ -122,6 +125,14 @@ struct range_estimates {
|
||||
int64_t mean_partition_size;
|
||||
};
|
||||
|
||||
using view_name = std::pair<sstring, sstring>;
|
||||
struct view_build_progress {
|
||||
view_name view;
|
||||
dht::token first_token;
|
||||
std::optional<dht::token> next_token;
|
||||
shard_id cpu_id;
|
||||
};
|
||||
|
||||
extern schema_ptr hints();
|
||||
extern schema_ptr batchlog();
|
||||
extern schema_ptr paxos();
|
||||
@@ -652,5 +663,13 @@ future<> set_bootstrap_state(bootstrap_state state);
|
||||
*/
|
||||
mutation make_size_estimates_mutation(const sstring& ks, std::vector<range_estimates> estimates);
|
||||
|
||||
future<> register_view_for_building(sstring ks_name, sstring view_name, const dht::token& token);
|
||||
future<> update_view_build_progress(sstring ks_name, sstring view_name, const dht::token& token);
|
||||
future<> remove_view_build_progress_across_all_shards(sstring ks_name, sstring view_name);
|
||||
future<> mark_view_as_built(sstring ks_name, sstring view_name);
|
||||
future<> remove_built_view(sstring ks_name, sstring view_name);
|
||||
future<std::vector<view_name>> load_built_views();
|
||||
future<std::vector<view_build_progress>> load_view_build_progress();
|
||||
|
||||
} // namespace system_keyspace
|
||||
} // namespace db
|
||||
|
||||
195
db/view/build_progress_virtual_reader.hh
Normal file
195
db/view/build_progress_virtual_reader.hh
Normal file
@@ -0,0 +1,195 @@
|
||||
/*
|
||||
* Copyright (C) 2018 ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* This file is part of Scylla.
|
||||
*
|
||||
* Scylla is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Affero General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* Scylla is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include "database.hh"
|
||||
#include "db/system_keyspace.hh"
|
||||
#include "db/timeout_clock.hh"
|
||||
#include "dht/i_partitioner.hh"
|
||||
#include "flat_mutation_reader.hh"
|
||||
#include "mutation_fragment.hh"
|
||||
#include "mutation_reader.hh"
|
||||
#include "query-request.hh"
|
||||
#include "schema.hh"
|
||||
#include "tracing/tracing.hh"
|
||||
|
||||
#include <boost/range/iterator_range.hpp>
|
||||
|
||||
#include <iterator>
|
||||
#include <memory>
|
||||
|
||||
namespace db::view {
|
||||
|
||||
// Allows a user to query the views_builds_in_progress system table
|
||||
// in terms of the scylla_views_builds_in_progress one, which is
|
||||
// a superset of the former. When querying, we don't have to adjust
|
||||
// the clustering key, but we have to adjust the requested regular
|
||||
// columns. When reading the results from the scylla_views_builds_in_progress
|
||||
// table, we adjust the clustering key (we shed the cpu_id column) and map
|
||||
// back the regular columns.
|
||||
class build_progress_virtual_reader {
|
||||
database& _db;
|
||||
|
||||
struct build_progress_reader : flat_mutation_reader::impl {
|
||||
column_id _scylla_next_token_col;
|
||||
column_id _scylla_generation_number_col;
|
||||
column_id _legacy_last_token_col;
|
||||
column_id _legacy_generation_number_col;
|
||||
const query::partition_slice& _legacy_slice;
|
||||
query::partition_slice _slice;
|
||||
flat_mutation_reader _underlying;
|
||||
|
||||
build_progress_reader(
|
||||
schema_ptr legacy_schema,
|
||||
column_family& scylla_views_build_progress,
|
||||
const dht::partition_range& range,
|
||||
const query::partition_slice& slice,
|
||||
const io_priority_class& pc,
|
||||
tracing::trace_state_ptr trace_state,
|
||||
streamed_mutation::forwarding fwd,
|
||||
mutation_reader::forwarding fwd_mr)
|
||||
: flat_mutation_reader::impl(std::move(legacy_schema))
|
||||
, _scylla_next_token_col(scylla_views_build_progress.schema()->get_column_definition("next_token")->id)
|
||||
, _scylla_generation_number_col(scylla_views_build_progress.schema()->get_column_definition("generation_number")->id)
|
||||
, _legacy_last_token_col(_schema->get_column_definition("last_token")->id)
|
||||
, _legacy_generation_number_col(_schema->get_column_definition("generation_number")->id)
|
||||
, _legacy_slice(slice)
|
||||
, _slice(adjust_partition_slice())
|
||||
, _underlying(scylla_views_build_progress.make_reader(
|
||||
scylla_views_build_progress.schema(),
|
||||
range,
|
||||
slice,
|
||||
pc,
|
||||
std::move(trace_state),
|
||||
fwd,
|
||||
fwd_mr)) {
|
||||
}
|
||||
|
||||
const schema& underlying_schema() const {
|
||||
return *_underlying.schema();
|
||||
}
|
||||
|
||||
query::partition_slice adjust_partition_slice() {
|
||||
auto slice = _legacy_slice;
|
||||
std::vector<column_id> adjusted_columns;
|
||||
for (auto col_id : slice.regular_columns) {
|
||||
if (col_id == _legacy_last_token_col) {
|
||||
adjusted_columns.push_back(_scylla_next_token_col);
|
||||
} else if (col_id == _legacy_generation_number_col) {
|
||||
adjusted_columns.push_back(_scylla_generation_number_col);
|
||||
}
|
||||
}
|
||||
slice.regular_columns = std::move(adjusted_columns);
|
||||
return slice;
|
||||
}
|
||||
|
||||
clustering_key adjust_ckey(clustering_key& ck) {
|
||||
if (ck.size(underlying_schema()) < 3) {
|
||||
return std::move(ck);
|
||||
}
|
||||
// Drop the cpu_id from the clustering key
|
||||
auto end = ck.begin(*_schema);
|
||||
std::advance(end, 1);
|
||||
auto r = boost::make_iterator_range(ck.begin(*_schema), std::move(end));
|
||||
return clustering_key_prefix::from_exploded(r);
|
||||
}
|
||||
|
||||
virtual future<> fill_buffer(db::timeout_clock::time_point timeout) override {
|
||||
return _underlying.fill_buffer(timeout).then([this] {
|
||||
_end_of_stream = _underlying.is_end_of_stream();
|
||||
while (!_underlying.is_buffer_empty()) {
|
||||
auto mf = _underlying.pop_mutation_fragment();
|
||||
if (mf.is_clustering_row()) {
|
||||
auto scylla_in_progress_row = std::move(mf).as_clustering_row();
|
||||
auto legacy_in_progress_row = row();
|
||||
// Drop the first_token from the regular columns
|
||||
scylla_in_progress_row.cells().for_each_cell([&, this] (column_id id, atomic_cell_or_collection& c) {
|
||||
if (id == _scylla_next_token_col) {
|
||||
legacy_in_progress_row.append_cell(_legacy_last_token_col, std::move(c));
|
||||
} else if (id == _scylla_generation_number_col) {
|
||||
legacy_in_progress_row.append_cell(_legacy_generation_number_col, std::move(c));
|
||||
}
|
||||
});
|
||||
mf = clustering_row(
|
||||
adjust_ckey(scylla_in_progress_row.key()),
|
||||
std::move(scylla_in_progress_row.tomb()),
|
||||
std::move(scylla_in_progress_row.marker()),
|
||||
std::move(legacy_in_progress_row));
|
||||
} else if (mf.is_range_tombstone()) {
|
||||
auto scylla_in_progress_rt = std::move(mf).as_range_tombstone();
|
||||
mf = range_tombstone(
|
||||
adjust_ckey(scylla_in_progress_rt.start),
|
||||
scylla_in_progress_rt.start_kind,
|
||||
scylla_in_progress_rt.end,
|
||||
scylla_in_progress_rt.end_kind,
|
||||
scylla_in_progress_rt.tomb);
|
||||
}
|
||||
push_mutation_fragment(mf);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
virtual void next_partition() override {
|
||||
_end_of_stream = false;
|
||||
clear_buffer_to_next_partition();
|
||||
if (is_buffer_empty()) {
|
||||
_underlying.next_partition();
|
||||
}
|
||||
}
|
||||
|
||||
virtual future<> fast_forward_to(const dht::partition_range& pr, db::timeout_clock::time_point timeout) override {
|
||||
clear_buffer();
|
||||
_end_of_stream = false;
|
||||
return _underlying.fast_forward_to(pr, timeout);
|
||||
}
|
||||
|
||||
virtual future<> fast_forward_to(position_range range, db::timeout_clock::time_point timeout) override {
|
||||
forward_buffer_to(range.start());
|
||||
_end_of_stream = false;
|
||||
return _underlying.fast_forward_to(std::move(range), timeout);
|
||||
}
|
||||
};
|
||||
|
||||
public:
|
||||
build_progress_virtual_reader(database& db)
|
||||
: _db(db) {
|
||||
}
|
||||
|
||||
flat_mutation_reader operator()(
|
||||
schema_ptr s,
|
||||
const dht::partition_range& range,
|
||||
const query::partition_slice& slice,
|
||||
const io_priority_class& pc,
|
||||
tracing::trace_state_ptr trace_state,
|
||||
streamed_mutation::forwarding fwd,
|
||||
mutation_reader::forwarding fwd_mr) {
|
||||
return flat_mutation_reader(std::make_unique<build_progress_reader>(
|
||||
std::move(s),
|
||||
_db.find_column_family(s->ks_name(), system_keyspace::v3::SCYLLA_VIEWS_BUILDS_IN_PROGRESS),
|
||||
range,
|
||||
slice,
|
||||
pc,
|
||||
std::move(trace_state),
|
||||
fwd,
|
||||
fwd_mr));
|
||||
}
|
||||
};
|
||||
|
||||
}
|
||||
611
db/view/view.cc
611
db/view/view.cc
@@ -39,22 +39,36 @@
|
||||
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include <vector>
|
||||
#include <deque>
|
||||
#include <functional>
|
||||
#include <optional>
|
||||
#include <unordered_set>
|
||||
#include <vector>
|
||||
|
||||
#include <boost/range/algorithm/find_if.hpp>
|
||||
#include <boost/range/algorithm/remove_if.hpp>
|
||||
#include <boost/range/algorithm/transform.hpp>
|
||||
#include <boost/range/adaptors.hpp>
|
||||
|
||||
#include <seastar/core/future-util.hh>
|
||||
|
||||
#include "database.hh"
|
||||
#include "clustering_bounds_comparator.hh"
|
||||
#include "cql3/statements/select_statement.hh"
|
||||
#include "cql3/util.hh"
|
||||
#include "db/view/view.hh"
|
||||
#include "db/view/view_builder.hh"
|
||||
#include "gms/inet_address.hh"
|
||||
#include "keys.hh"
|
||||
#include "locator/network_topology_strategy.hh"
|
||||
#include "mutation.hh"
|
||||
#include "mutation_partition.hh"
|
||||
#include "service/migration_manager.hh"
|
||||
#include "service/storage_service.hh"
|
||||
#include "view_info.hh"
|
||||
|
||||
using namespace std::chrono_literals;
|
||||
|
||||
static logging::logger vlogger("view");
|
||||
|
||||
view_info::view_info(const schema& schema, const raw_view_info& raw_view_info)
|
||||
@@ -791,7 +805,7 @@ get_view_natural_endpoint(const sstring& keyspace_name,
|
||||
// for the writes to complete.
|
||||
// FIXME: I dropped a lot of parameters the Cassandra version had,
|
||||
// we may need them back: writeCommitLog, baseComplete, queryStartNanoTime.
|
||||
void mutate_MV(const dht::token& base_token,
|
||||
future<> mutate_MV(const dht::token& base_token,
|
||||
std::vector<mutation> mutations)
|
||||
{
|
||||
#if 0
|
||||
@@ -823,6 +837,7 @@ void mutate_MV(const dht::token& base_token,
|
||||
() -> asyncRemoveFromBatchlog(batchlogEndpoints, batchUUID));
|
||||
// add a handler for each mutation - includes checking availability, but doesn't initiate any writes, yet
|
||||
#endif
|
||||
auto fs = std::make_unique<std::vector<future<>>>();
|
||||
for (auto& mut : mutations) {
|
||||
auto view_token = mut.token();
|
||||
auto keyspace_name = mut.schema()->ks_name();
|
||||
@@ -840,9 +855,10 @@ void mutate_MV(const dht::token& base_token,
|
||||
// do not wait for it to complete.
|
||||
// Note also that mutate_locally(mut) copies mut (in
|
||||
// frozen form) so don't need to increase its lifetime.
|
||||
service::get_local_storage_proxy().mutate_locally(mut).handle_exception([] (auto ep) {
|
||||
fs->push_back(service::get_local_storage_proxy().mutate_locally(mut).handle_exception([] (auto ep) {
|
||||
vlogger.error("Error applying local view update: {}", ep);
|
||||
});
|
||||
return make_exception_future<>(std::move(ep));
|
||||
}));
|
||||
} else {
|
||||
vlogger.debug("Sending view update to endpoint {}, with pending endpoints = {}", *paired_endpoint, pending_endpoints);
|
||||
// Note we don't wait for the asynchronous operation to complete
|
||||
@@ -852,9 +868,10 @@ void mutate_MV(const dht::token& base_token,
|
||||
// to send the update there. Currently, we do this from *each* of
|
||||
// the base replicas, but this is probably excessive - see
|
||||
// See https://issues.apache.org/jira/browse/CASSANDRA-14262/
|
||||
service::get_local_storage_proxy().send_to_endpoint(std::move(mut), *paired_endpoint, std::move(pending_endpoints), db::write_type::VIEW).handle_exception([paired_endpoint] (auto ep) {
|
||||
fs->push_back(service::get_local_storage_proxy().send_to_endpoint(std::move(mut), *paired_endpoint, std::move(pending_endpoints), db::write_type::VIEW).handle_exception([paired_endpoint] (auto ep) {
|
||||
vlogger.error("Error applying view update to {}: {}", *paired_endpoint, ep);
|
||||
});;
|
||||
return make_exception_future<>(std::move(ep));
|
||||
}));
|
||||
}
|
||||
} else {
|
||||
#if 0
|
||||
@@ -897,8 +914,588 @@ void mutate_MV(const dht::token& base_token,
|
||||
viewWriteMetrics.addNano(System.nanoTime() - startTime);
|
||||
}
|
||||
#endif
|
||||
auto f = seastar::when_all_succeed(fs->begin(), fs->end());
|
||||
return f.finally([fs = std::move(fs)] { });
|
||||
}
|
||||
|
||||
view_builder::view_builder(database& db, db::system_distributed_keyspace& sys_dist_ks, service::migration_manager& mm)
|
||||
: _db(db)
|
||||
, _sys_dist_ks(sys_dist_ks)
|
||||
, _mm(mm) {
|
||||
}
|
||||
|
||||
future<> view_builder::start() {
|
||||
return seastar::async([this] {
|
||||
auto built = system_keyspace::load_built_views().get0();
|
||||
auto in_progress = system_keyspace::load_view_build_progress().get0();
|
||||
calculate_shard_build_step(std::move(built), std::move(in_progress)).get();
|
||||
_mm.register_listener(this);
|
||||
_current_step = _base_to_build_step.begin();
|
||||
_build_step.trigger();
|
||||
});
|
||||
}
|
||||
|
||||
future<> view_builder::stop() {
|
||||
vlogger.info("Stopping view builder");
|
||||
_mm.unregister_listener(this);
|
||||
_as.request_abort();
|
||||
return _sem.wait().then([this] {
|
||||
_sem.broken();
|
||||
return _build_step.join();
|
||||
});
|
||||
}
|
||||
|
||||
static query::partition_slice make_partition_slice(const schema& s) {
|
||||
query::partition_slice::option_set opts;
|
||||
opts.set(query::partition_slice::option::send_partition_key);
|
||||
opts.set(query::partition_slice::option::send_clustering_key);
|
||||
opts.set(query::partition_slice::option::send_timestamp);
|
||||
opts.set(query::partition_slice::option::send_ttl);
|
||||
return query::partition_slice(
|
||||
{query::full_clustering_range},
|
||||
{ },
|
||||
boost::copy_range<std::vector<column_id>>(s.regular_columns()
|
||||
| boost::adaptors::transformed(std::mem_fn(&column_definition::id))),
|
||||
std::move(opts));
|
||||
}
|
||||
|
||||
view_builder::build_step& view_builder::get_or_create_build_step(utils::UUID base_id) {
|
||||
auto it = _base_to_build_step.find(base_id);
|
||||
if (it == _base_to_build_step.end()) {
|
||||
auto base = _db.find_column_family(base_id).shared_from_this();
|
||||
auto p = _base_to_build_step.emplace(base_id, build_step{base, make_partition_slice(*base->schema())});
|
||||
// Iterators could have been invalidated if there was rehashing, so just reset the cursor.
|
||||
_current_step = p.first;
|
||||
it = p.first;
|
||||
}
|
||||
return it->second;
|
||||
}
|
||||
|
||||
void view_builder::initialize_reader_at_current_token(build_step& step) {
|
||||
step.pslice = make_partition_slice(*step.base->schema());
|
||||
step.prange = dht::partition_range(dht::ring_position::starting_at(step.current_token()), dht::ring_position::max());
|
||||
step.reader = make_local_shard_sstable_reader(
|
||||
step.base->schema(),
|
||||
make_lw_shared(sstables::sstable_set(step.base->get_sstable_set())),
|
||||
step.prange,
|
||||
step.pslice,
|
||||
default_priority_class(),
|
||||
no_resource_tracking(),
|
||||
nullptr,
|
||||
streamed_mutation::forwarding::no,
|
||||
mutation_reader::forwarding::no);
|
||||
}
|
||||
|
||||
void view_builder::load_view_status(view_builder::view_build_status status, std::unordered_set<utils::UUID>& loaded_views) {
|
||||
if (!status.next_token) {
|
||||
// No progress was made on this view, so we'll treat it as new.
|
||||
return;
|
||||
}
|
||||
vlogger.info0("Resuming to build view {}.{} at {}", status.view->ks_name(), status.view->cf_name(), *status.next_token);
|
||||
loaded_views.insert(status.view->id());
|
||||
if (status.first_token == *status.next_token) {
|
||||
// Completed, so nothing to do for this shard. Consider the view
|
||||
// as loaded and not as a new view.
|
||||
_built_views.emplace(status.view->id());
|
||||
return;
|
||||
}
|
||||
get_or_create_build_step(status.view->view_info()->base_id()).build_status.emplace_back(std::move(status));
|
||||
}
|
||||
|
||||
void view_builder::reshard(
|
||||
std::vector<std::vector<view_builder::view_build_status>> view_build_status_per_shard,
|
||||
std::unordered_set<utils::UUID>& loaded_views) {
|
||||
// We must reshard. We aim for a simple algorithm, a step above not starting from scratch.
|
||||
// Shards build entries at different paces, so both first and last tokens will differ. We
|
||||
// want to be conservative when selecting the range that has been built. To do that, we
|
||||
// select the intersection of all the previous shard's ranges for each view.
|
||||
struct view_ptr_hash {
|
||||
std::size_t operator()(const view_ptr& v) const noexcept {
|
||||
return std::hash<utils::UUID>()(v->id());
|
||||
}
|
||||
};
|
||||
struct view_ptr_equals {
|
||||
bool operator()(const view_ptr& v1, const view_ptr& v2) const noexcept {
|
||||
return v1->id() == v2->id();
|
||||
}
|
||||
};
|
||||
std::unordered_map<view_ptr, stdx::optional<nonwrapping_range<dht::token>>, view_ptr_hash, view_ptr_equals> my_status;
|
||||
for (auto& shard_status : view_build_status_per_shard) {
|
||||
for (auto& [view, first_token, next_token] : shard_status ) {
|
||||
// We start from an open-ended range, which we'll try to restrict.
|
||||
auto& my_range = my_status.emplace(
|
||||
std::move(view),
|
||||
nonwrapping_range<dht::token>::make_open_ended_both_sides()).first->second;
|
||||
if (!next_token || !my_range) {
|
||||
// A previous shard made no progress, so for this view we'll start over.
|
||||
my_range = stdx::nullopt;
|
||||
continue;
|
||||
}
|
||||
if (first_token == *next_token) {
|
||||
// Completed, so don't consider this shard's progress. We know that if the view
|
||||
// is marked as in-progress, then at least one shard will have a non-full range.
|
||||
continue;
|
||||
}
|
||||
wrapping_range<dht::token> other_range(first_token, *next_token);
|
||||
if (other_range.is_wrap_around(dht::token_comparator())) {
|
||||
// The intersection of a wrapping range with a non-wrapping range may yield more
|
||||
// multiple non-contiguous ranges. To avoid the complexity of dealing with more
|
||||
// than one range, we'll just take one of the intersections.
|
||||
auto [bottom_range, top_range] = other_range.unwrap();
|
||||
if (auto bottom_int = my_range->intersection(nonwrapping_range(std::move(bottom_range)), dht::token_comparator())) {
|
||||
my_range = std::move(bottom_int);
|
||||
} else {
|
||||
my_range = my_range->intersection(nonwrapping_range(std::move(top_range)), dht::token_comparator());
|
||||
}
|
||||
} else {
|
||||
my_range = my_range->intersection(nonwrapping_range(std::move(other_range)), dht::token_comparator());
|
||||
}
|
||||
}
|
||||
}
|
||||
view_builder::base_to_build_step_type build_step;
|
||||
for (auto& [view, opt_range] : my_status) {
|
||||
if (!opt_range) {
|
||||
continue; // Treat it as a new table.
|
||||
}
|
||||
auto start_bound = opt_range->start() ? std::move(opt_range->start()->value()) : dht::minimum_token();
|
||||
auto end_bound = opt_range->end() ? std::move(opt_range->end()->value()) : dht::minimum_token();
|
||||
auto s = view_build_status{std::move(view), std::move(start_bound), std::move(end_bound)};
|
||||
load_view_status(std::move(s), loaded_views);
|
||||
}
|
||||
}
|
||||
|
||||
future<> view_builder::calculate_shard_build_step(
|
||||
std::vector<system_keyspace::view_name> built,
|
||||
std::vector<system_keyspace::view_build_progress> in_progress) {
|
||||
// Shard 0 makes cleanup changes to the system tables, but none that could conflict
|
||||
// with the other shards; everyone is thus able to proceed independently.
|
||||
auto bookkeeping_ops = std::make_unique<std::vector<future<>>>();
|
||||
auto base_table_exists = [&, this] (const view_ptr& view) {
|
||||
// This is a safety check in case this node missed a create MV statement
|
||||
// but got a drop table for the base, and another node didn't get the
|
||||
// drop notification and sent us the view schema.
|
||||
try {
|
||||
_db.find_schema(view->view_info()->base_id());
|
||||
return true;
|
||||
} catch (const no_such_column_family&) {
|
||||
return false;
|
||||
}
|
||||
};
|
||||
auto maybe_fetch_view = [&, this] (system_keyspace::view_name& name) {
|
||||
try {
|
||||
auto s = _db.find_schema(name.first, name.second);
|
||||
if (s->is_view()) {
|
||||
auto view = view_ptr(std::move(s));
|
||||
if (base_table_exists(view)) {
|
||||
return view;
|
||||
}
|
||||
}
|
||||
// The view was dropped and a table was re-created with the same name,
|
||||
// but the write to the view-related system tables didn't make it.
|
||||
} catch (const no_such_column_family&) {
|
||||
// Fall-through
|
||||
}
|
||||
if (engine().cpu_id() == 0) {
|
||||
bookkeeping_ops->push_back(_sys_dist_ks.remove_view(name.first, name.second));
|
||||
bookkeeping_ops->push_back(system_keyspace::remove_built_view(name.first, name.second));
|
||||
bookkeeping_ops->push_back(
|
||||
system_keyspace::remove_view_build_progress_across_all_shards(
|
||||
std::move(name.first),
|
||||
std::move(name.second)));
|
||||
}
|
||||
return view_ptr(nullptr);
|
||||
};
|
||||
|
||||
auto built_views = boost::copy_range<std::unordered_set<utils::UUID>>(built
|
||||
| boost::adaptors::transformed(maybe_fetch_view)
|
||||
| boost::adaptors::filtered([] (const view_ptr& v) { return bool(v); })
|
||||
| boost::adaptors::transformed([] (const view_ptr& v) { return v->id(); }));
|
||||
|
||||
std::vector<std::vector<view_build_status>> view_build_status_per_shard;
|
||||
for (auto& [view_name, first_token, next_token_opt, cpu_id] : in_progress) {
|
||||
if (auto view = maybe_fetch_view(view_name)) {
|
||||
if (built_views.find(view->id()) != built_views.end()) {
|
||||
if (engine().cpu_id() == 0) {
|
||||
auto f = _sys_dist_ks.finish_view_build(std::move(view_name.first), std::move(view_name.second)).then([view = std::move(view)] {
|
||||
system_keyspace::remove_view_build_progress_across_all_shards(view->cf_name(), view->ks_name());
|
||||
});
|
||||
bookkeeping_ops->push_back(std::move(f));
|
||||
}
|
||||
continue;
|
||||
}
|
||||
view_build_status_per_shard.resize(std::max(view_build_status_per_shard.size(), size_t(cpu_id + 1)));
|
||||
view_build_status_per_shard[cpu_id].emplace_back(view_build_status{
|
||||
std::move(view),
|
||||
std::move(first_token),
|
||||
std::move(next_token_opt)});
|
||||
}
|
||||
}
|
||||
|
||||
std::unordered_set<utils::UUID> loaded_views;
|
||||
if (view_build_status_per_shard.size() != smp::count) {
|
||||
reshard(std::move(view_build_status_per_shard), loaded_views);
|
||||
} else if (!view_build_status_per_shard.empty()) {
|
||||
for (auto& status : view_build_status_per_shard[engine().cpu_id()]) {
|
||||
load_view_status(std::move(status), loaded_views);
|
||||
}
|
||||
}
|
||||
|
||||
for (auto& [_, build_step] : _base_to_build_step) {
|
||||
boost::sort(build_step.build_status, [] (view_build_status s1, view_build_status s2) {
|
||||
return *s1.next_token < *s2.next_token;
|
||||
});
|
||||
if (!build_step.build_status.empty()) {
|
||||
build_step.current_key = dht::decorated_key{*build_step.build_status.front().next_token, partition_key::make_empty()};
|
||||
}
|
||||
}
|
||||
|
||||
auto all_views = _db.get_views();
|
||||
auto is_new = [&] (const view_ptr& v) {
|
||||
return base_table_exists(v) && loaded_views.find(v->id()) == loaded_views.end()
|
||||
&& built_views.find(v->id()) == built_views.end();
|
||||
};
|
||||
for (auto&& view : all_views | boost::adaptors::filtered(is_new)) {
|
||||
bookkeeping_ops->push_back(add_new_view(view, get_or_create_build_step(view->view_info()->base_id())));
|
||||
}
|
||||
|
||||
for (auto& [_, build_step] : _base_to_build_step) {
|
||||
initialize_reader_at_current_token(build_step);
|
||||
}
|
||||
|
||||
auto f = seastar::when_all_succeed(bookkeeping_ops->begin(), bookkeeping_ops->end());
|
||||
return f.handle_exception([bookkeeping_ops = std::move(bookkeeping_ops)] (std::exception_ptr ep) {
|
||||
vlogger.error("Failed to update materialized view bookkeeping ({}), continuing anyway.", ep);
|
||||
});
|
||||
}
|
||||
|
||||
future<> view_builder::add_new_view(view_ptr view, build_step& step) {
|
||||
vlogger.info0("Building view {}.{}, starting at token {}", view->ks_name(), view->cf_name(), step.current_token());
|
||||
step.build_status.emplace(step.build_status.begin(), view_build_status{view, step.current_token(), std::nullopt});
|
||||
return when_all_succeed(
|
||||
system_keyspace::register_view_for_building(view->ks_name(), view->cf_name(), step.current_token()),
|
||||
_sys_dist_ks.start_view_build(view->ks_name(), view->cf_name()));
|
||||
}
|
||||
|
||||
static future<> flush_base(lw_shared_ptr<column_family> base, abort_source& as) {
|
||||
struct empty_state { };
|
||||
return exponential_backoff_retry::do_until_value(1s, 1min, as, [base = std::move(base)] {
|
||||
return base->flush().then_wrapped([base] (future<> f) -> stdx::optional<empty_state> {
|
||||
if (f.failed()) {
|
||||
vlogger.error("Error flushing base table {}.{}: {}; retrying", base->schema()->ks_name(), base->schema()->cf_name(), f.get_exception());
|
||||
return { };
|
||||
}
|
||||
return { empty_state{} };
|
||||
});
|
||||
}).discard_result();
|
||||
}
|
||||
|
||||
void view_builder::on_create_view(const sstring& ks_name, const sstring& view_name) {
|
||||
with_semaphore(_sem, 1, [ks_name, view_name, this] {
|
||||
auto view = view_ptr(_db.find_schema(ks_name, view_name));
|
||||
auto& step = get_or_create_build_step(view->view_info()->base_id());
|
||||
return step.base->await_pending_writes().then([this, &step] {
|
||||
return flush_base(step.base, _as);
|
||||
}).then([this, view, &step] () mutable {
|
||||
// This resets the build step to the current token. It may result in views currently
|
||||
// being built to receive duplicate updates, but it simplifies things as we don't have
|
||||
// to keep around a list of new views to build the next time the reader crosses a token
|
||||
// threshold.
|
||||
initialize_reader_at_current_token(step);
|
||||
return add_new_view(view, step).then_wrapped([this, view] (future<>&& f) {
|
||||
if (f.failed()) {
|
||||
vlogger.error("Error setting up view for building {}.{}: {}", view->ks_name(), view->cf_name(), f.get_exception());
|
||||
}
|
||||
_build_step.trigger();
|
||||
});
|
||||
});
|
||||
}).handle_exception_type([] (no_such_column_family&) { });
|
||||
}
|
||||
|
||||
void view_builder::on_update_view(const sstring& ks_name, const sstring& view_name, bool) {
|
||||
with_semaphore(_sem, 1, [ks_name, view_name, this] {
|
||||
auto view = view_ptr(_db.find_schema(ks_name, view_name));
|
||||
auto step_it = _base_to_build_step.find(view->view_info()->base_id());
|
||||
if (step_it == _base_to_build_step.end()) {
|
||||
return;// In case all the views for this CF have finished building already.
|
||||
}
|
||||
auto status_it = boost::find_if(step_it->second.build_status, [view] (const view_build_status& bs) {
|
||||
return bs.view->id() == view->id();
|
||||
});
|
||||
if (status_it != step_it->second.build_status.end()) {
|
||||
status_it->view = std::move(view);
|
||||
}
|
||||
}).handle_exception_type([] (no_such_column_family&) { });
|
||||
}
|
||||
|
||||
void view_builder::on_drop_view(const sstring& ks_name, const sstring& view_name) {
|
||||
vlogger.info0("Stopping to build view {}.{}", ks_name, view_name);
|
||||
with_semaphore(_sem, 1, [ks_name, view_name, this] {
|
||||
// The view is absent from the database at this point, so find it by brute force.
|
||||
([&, this] {
|
||||
for (auto& [_, step] : _base_to_build_step) {
|
||||
if (step.build_status.empty() || step.build_status.front().view->ks_name() != ks_name) {
|
||||
continue;
|
||||
}
|
||||
for (auto it = step.build_status.begin(); it != step.build_status.end(); ++it) {
|
||||
if (it->view->cf_name() == view_name) {
|
||||
_built_views.erase(it->view->id());
|
||||
step.build_status.erase(it);
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
})();
|
||||
if (engine().cpu_id() != 0) {
|
||||
return make_ready_future();
|
||||
}
|
||||
return when_all_succeed(
|
||||
system_keyspace::remove_view_build_progress_across_all_shards(ks_name, view_name),
|
||||
system_keyspace::remove_built_view(ks_name, view_name),
|
||||
_sys_dist_ks.remove_view(ks_name, view_name)).handle_exception([ks_name, view_name] (std::exception_ptr ep) {
|
||||
vlogger.warn("Failed to cleanup view {}.{}: {}", ks_name, view_name, ep);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
future<> view_builder::do_build_step() {
|
||||
return seastar::async([this] {
|
||||
exponential_backoff_retry r(1s, 1min);
|
||||
while (!_base_to_build_step.empty() && !_as.abort_requested()) {
|
||||
auto units = get_units(_sem, 1).get0();
|
||||
try {
|
||||
execute(_current_step->second, exponential_backoff_retry(1s, 1min));
|
||||
r.reset();
|
||||
} catch (const abort_requested_exception&) {
|
||||
return;
|
||||
} catch (...) {
|
||||
auto base = _current_step->second.base->schema();
|
||||
vlogger.warn("Error executing build step for base {}.{}: {}", base->ks_name(), base->cf_name(), std::current_exception());
|
||||
r.retry(_as).get();
|
||||
initialize_reader_at_current_token(_current_step->second);
|
||||
}
|
||||
if (_current_step->second.build_status.empty()) {
|
||||
_current_step = _base_to_build_step.erase(_current_step);
|
||||
} else {
|
||||
++_current_step;
|
||||
}
|
||||
if (_current_step == _base_to_build_step.end()) {
|
||||
_current_step = _base_to_build_step.begin();
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
// Called in the context of a seastar::thread.
|
||||
class view_builder::consumer {
|
||||
public:
|
||||
struct built_views {
|
||||
build_step& step;
|
||||
std::vector<view_build_status> views;
|
||||
|
||||
built_views(build_step& step)
|
||||
: step(step) {
|
||||
}
|
||||
|
||||
built_views(built_views&& other)
|
||||
: step(other.step)
|
||||
, views(std::move(other.views)) {
|
||||
}
|
||||
|
||||
~built_views() {
|
||||
for (auto&& status : views) {
|
||||
std::cout << "putting " << status.view->cf_name() << " back\n";
|
||||
// Use step.current_token(), which may have wrapped around and become < first_token.
|
||||
step.build_status.emplace_back(view_build_status{std::move(status.view), step.current_token(), step.current_token()});
|
||||
}
|
||||
}
|
||||
|
||||
void release() {
|
||||
views.clear();
|
||||
}
|
||||
};
|
||||
|
||||
private:
|
||||
view_builder& _builder;
|
||||
build_step& _step;
|
||||
built_views _built_views;
|
||||
std::vector<view_ptr> _views_to_build;
|
||||
std::deque<mutation_fragment> _fragments;
|
||||
|
||||
public:
|
||||
consumer(view_builder& builder, build_step& step)
|
||||
: _builder(builder)
|
||||
, _step(step)
|
||||
, _built_views{step} {
|
||||
if (!step.current_key.key().is_empty(*_step.reader.schema())) {
|
||||
load_views_to_build();
|
||||
}
|
||||
}
|
||||
|
||||
void load_views_to_build() {
|
||||
for (auto&& vs : _step.build_status) {
|
||||
if (_step.current_token() >= vs.next_token) {
|
||||
if (partition_key_matches(*_step.reader.schema(), *vs.view->view_info(), _step.current_key)) {
|
||||
_views_to_build.push_back(vs.view);
|
||||
}
|
||||
if (vs.next_token || _step.current_token() != vs.first_token) {
|
||||
vs.next_token = _step.current_key.token();
|
||||
}
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void check_for_built_views() {
|
||||
for (auto it = _step.build_status.begin(); it != _step.build_status.end();) {
|
||||
// A view starts being built at token t1. Due to resharding, that may not necessarily be a
|
||||
// shard-owned token. We finish building the view when the next_token to build is just before
|
||||
// (or at) the first token, but the shard-owned current token is after (or at) the first token.
|
||||
// In the system tables, we set first_token = next_token to signal the completion of the build
|
||||
// process in case of a restart.
|
||||
if (it->next_token && *it->next_token <= it->first_token && _step.current_token() >= it->first_token) {
|
||||
_built_views.views.push_back(std::move(*it));
|
||||
it = _step.build_status.erase(it);
|
||||
} else {
|
||||
++it;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
stop_iteration consume_new_partition(const dht::decorated_key& dk) {
|
||||
_step.current_key = std::move(dk);
|
||||
check_for_built_views();
|
||||
_views_to_build.clear();
|
||||
load_views_to_build();
|
||||
return stop_iteration(_views_to_build.empty());
|
||||
}
|
||||
|
||||
stop_iteration consume(tombstone) {
|
||||
return stop_iteration::no;
|
||||
}
|
||||
|
||||
stop_iteration consume(static_row&&, tombstone, bool) {
|
||||
return stop_iteration::no;
|
||||
}
|
||||
|
||||
stop_iteration consume(clustering_row&& cr, row_tombstone, bool) {
|
||||
if (_views_to_build.empty() || _builder._as.abort_requested()) {
|
||||
return stop_iteration::yes;
|
||||
}
|
||||
|
||||
_fragments.push_back(std::move(cr));
|
||||
return stop_iteration::no;
|
||||
}
|
||||
|
||||
stop_iteration consume(range_tombstone&&) {
|
||||
return stop_iteration::no;
|
||||
}
|
||||
|
||||
stop_iteration consume_end_of_partition() {
|
||||
_builder._as.check();
|
||||
if (!_fragments.empty()) {
|
||||
_fragments.push_front(partition_start(_step.current_key, tombstone()));
|
||||
_step.base->populate_views(
|
||||
_views_to_build,
|
||||
_step.current_token(),
|
||||
make_flat_mutation_reader_from_fragments(_step.base->schema(), std::move(_fragments))).get();
|
||||
_fragments.clear();
|
||||
}
|
||||
return stop_iteration(_step.build_status.empty());
|
||||
}
|
||||
|
||||
built_views consume_end_of_stream() {
|
||||
if (vlogger.is_enabled(log_level::debug)) {
|
||||
auto view_names = boost::copy_range<std::vector<sstring>>(
|
||||
_views_to_build | boost::adaptors::transformed([](auto v) {
|
||||
return v->cf_name();
|
||||
}));
|
||||
vlogger.debug("Completed build step for base {}.{}, at token {}; views={}", _step.base->schema()->ks_name(),
|
||||
_step.base->schema()->cf_name(), _step.current_token(), view_names);
|
||||
}
|
||||
if (_step.reader.is_end_of_stream() && _step.reader.is_buffer_empty()) {
|
||||
_step.current_key = {dht::minimum_token(), partition_key::make_empty()};
|
||||
for (auto&& vs : _step.build_status) {
|
||||
vs.next_token = dht::minimum_token();
|
||||
}
|
||||
_builder.initialize_reader_at_current_token(_step);
|
||||
check_for_built_views();
|
||||
}
|
||||
return std::move(_built_views);
|
||||
}
|
||||
};
|
||||
|
||||
// Called in the context of a seastar::thread.
|
||||
void view_builder::execute(build_step& step, exponential_backoff_retry r) {
|
||||
auto consumer = compact_for_query<emit_only_live_rows::yes, view_builder::consumer>(
|
||||
*step.reader.schema(),
|
||||
gc_clock::now(),
|
||||
step.pslice,
|
||||
batch_size,
|
||||
query::max_partitions,
|
||||
view_builder::consumer{*this, step});
|
||||
consumer.consume_new_partition(step.current_key); // Initialize the state in case we're resuming a partition
|
||||
auto built = step.reader.consume_in_thread(std::move(consumer));
|
||||
|
||||
_as.check();
|
||||
|
||||
std::vector<future<>> bookkeeping_ops;
|
||||
bookkeeping_ops.reserve(built.views.size() + step.build_status.size());
|
||||
for (auto& [view, first_token, _] : built.views) {
|
||||
bookkeeping_ops.push_back(maybe_mark_view_as_built(view, first_token));
|
||||
}
|
||||
built.release();
|
||||
for (auto& [view, _, next_token] : step.build_status) {
|
||||
if (next_token) {
|
||||
bookkeeping_ops.push_back(
|
||||
system_keyspace::update_view_build_progress(view->ks_name(), view->cf_name(), *next_token));
|
||||
}
|
||||
}
|
||||
seastar::when_all_succeed(bookkeeping_ops.begin(), bookkeeping_ops.end()).handle_exception([] (std::exception_ptr ep) {
|
||||
vlogger.error("Failed to update materialized view bookkeeping ({}), continuing anyway.", ep);
|
||||
}).get();
|
||||
}
|
||||
|
||||
future<> view_builder::maybe_mark_view_as_built(view_ptr view, dht::token next_token) {
|
||||
_built_views.emplace(view->id());
|
||||
vlogger.debug("Shard finished building view {}.{}", view->ks_name(), view->cf_name());
|
||||
return container().map_reduce0(
|
||||
[view_id = view->id()] (view_builder& builder) {
|
||||
return builder._built_views.count(view_id);
|
||||
},
|
||||
true,
|
||||
[] (bool result, bool shard_complete) {
|
||||
return result & shard_complete;
|
||||
}).then([this, view, next_token = std::move(next_token)] (bool built) {
|
||||
if (built) {
|
||||
return container().invoke_on_all([view_id = view->id()] (view_builder& builder) {
|
||||
if (builder._built_views.erase(view_id) == 0 || engine().cpu_id() != 0) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
auto view = builder._db.find_schema(view_id);
|
||||
vlogger.info("Finished building view {}.{}", view->ks_name(), view->cf_name());
|
||||
return seastar::when_all_succeed(
|
||||
system_keyspace::mark_view_as_built(view->ks_name(), view->cf_name()),
|
||||
builder._sys_dist_ks.finish_view_build(view->ks_name(), view->cf_name())).then([view] {
|
||||
return system_keyspace::remove_view_build_progress_across_all_shards(view->ks_name(), view->cf_name());
|
||||
}).then([&builder, view] {
|
||||
auto it = builder._build_notifiers.find(std::pair(view->ks_name(), view->cf_name()));
|
||||
if (it != builder._build_notifiers.end()) {
|
||||
it->second.set_value();
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
return system_keyspace::update_view_build_progress(view->ks_name(), view->cf_name(), next_token);
|
||||
});
|
||||
}
|
||||
|
||||
future<> view_builder::wait_until_built(const sstring& ks_name, const sstring& view_name, lowres_clock::time_point timeout) {
|
||||
return container().invoke_on(0, [ks_name, view_name, timeout] (view_builder& builder) {
|
||||
auto v = std::pair(std::move(ks_name), std::move(view_name));
|
||||
return builder._build_notifiers[std::move(v)].get_shared_future(timeout);
|
||||
});
|
||||
}
|
||||
|
||||
} // namespace view
|
||||
} // namespace db
|
||||
|
||||
|
||||
@@ -92,7 +92,7 @@ query::clustering_row_ranges calculate_affected_clustering_ranges(
|
||||
const mutation_partition& mp,
|
||||
const std::vector<view_ptr>& views);
|
||||
|
||||
void mutate_MV(const dht::token& base_token,
|
||||
future<> mutate_MV(const dht::token& base_token,
|
||||
std::vector<mutation> mutations);
|
||||
|
||||
}
|
||||
|
||||
196
db/view/view_builder.hh
Normal file
196
db/view/view_builder.hh
Normal file
@@ -0,0 +1,196 @@
|
||||
/*
|
||||
* Copyright (C) 2018 ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* This file is part of Scylla.
|
||||
*
|
||||
* Scylla is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Affero General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* Scylla is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "database_fwd.hh"
|
||||
#include "db/system_keyspace.hh"
|
||||
#include "db/system_distributed_keyspace.hh"
|
||||
#include "dht/i_partitioner.hh"
|
||||
#include "keys.hh"
|
||||
#include "query-request.hh"
|
||||
#include "service/migration_listener.hh"
|
||||
#include "service/migration_manager.hh"
|
||||
#include "sstables/sstable_set.hh"
|
||||
#include "utils/exponential_backoff_retry.hh"
|
||||
#include "utils/serialized_action.hh"
|
||||
#include "utils/UUID.hh"
|
||||
|
||||
#include <seastar/core/abort_source.hh>
|
||||
#include <seastar/core/future.hh>
|
||||
#include <seastar/core/lowres_clock.hh>
|
||||
#include <seastar/core/semaphore.hh>
|
||||
#include <seastar/core/sharded.hh>
|
||||
#include <seastar/core/shared_future.hh>
|
||||
#include <seastar/core/shared_ptr.hh>
|
||||
|
||||
#include <optional>
|
||||
#include <unordered_map>
|
||||
#include <vector>
|
||||
|
||||
namespace db::view {
|
||||
|
||||
/**
|
||||
* The view_builder is a sharded service responsible for building all defined materialized views.
|
||||
* This process entails walking over the existing data in a given base table, and using it to
|
||||
* calculate and insert the respective entries for one or more views.
|
||||
*
|
||||
* We employ a flat_mutation_reader for each base table for which we're building views.
|
||||
*
|
||||
* We aim to be resource-conscious. On a given shard, at any given moment, we consume at most
|
||||
* from one reader. We also strive for fairness, in that each build step inserts entries for
|
||||
* the views of a different base. Each build step reads and generates updates for batch_size rows.
|
||||
*
|
||||
* We lack a controller, which could potentially allow us to go faster (to execute multiple steps at
|
||||
* the same time, or consume more rows per batch), and also which would apply backpressure, so we
|
||||
* could, for example, delay executing a build step.
|
||||
*
|
||||
* View building is necessarily a sharded process. That means that on restart, if the number of shards
|
||||
* has changed, we need to calculate the most conservative token range that has been built, and build
|
||||
* the remainder.
|
||||
*
|
||||
* Interaction with the system tables:
|
||||
* - When we start building a view, we add an entry to the scylla_views_builds_in_progress
|
||||
* system table. If the node restarts at this point, we'll consider these newly inserted
|
||||
* views as having made no progress, and we'll treat them as new views;
|
||||
* - When we finish a build step, we update the progress of the views that we built during
|
||||
* this step by writing the next token to the scylla_views_builds_in_progress table. If
|
||||
* the node restarts here, we'll start building the views at the token in the next_token column.
|
||||
* - When we finish building a view, we mark it as completed in the built views system table, and
|
||||
* remove it from the in-progress system table. Under failure, the following can happen:
|
||||
* * When we fail to mark the view as built, we'll redo the last step upon node reboot;
|
||||
* * When we fail to delete the in-progress record, upon reboot we'll remove this record.
|
||||
* A view is marked as completed only when all shards have finished their share of the work, that is,
|
||||
* if a view is not built, then all shards will still have an entry in the in-progress system table,
|
||||
* - A view that a shard finished building, but not all other shards, remains in the in-progress system
|
||||
* table, with first_token == next_token.
|
||||
* Interaction with the distributed system table (view_build_status):
|
||||
* - When we start building a view, we mark the view build as being in-progress;
|
||||
* - When we finish building a view, we mark the view as being built. Upon failure,
|
||||
* we ensure that if the view is in the in-progress system table, then it may not
|
||||
* have been written to this table. We don't load the built views from this table
|
||||
* when starting. When starting, the following happens:
|
||||
* * If the view is in the system.built_views table and not the in-progress
|
||||
* system table, then it will be in view_build_status;
|
||||
* * If the view is in the system.built_views table and not in this one, it
|
||||
* will still be in the in-progress system table - we detect this and mark
|
||||
* it as built in this table too, keeping the invariant;
|
||||
* * If the view is in this table but not in system.built_views, then it will
|
||||
* also be in the in-progress system table - we don't detect this and will
|
||||
* redo the missing step, for simplicity.
|
||||
*/
|
||||
class view_builder final : public service::migration_listener::only_view_notifications, public seastar::peering_sharded_service<view_builder> {
|
||||
/**
|
||||
* Keeps track of the build progress for a particular view.
|
||||
* When the view is built, next_token == first_token.
|
||||
*/
|
||||
struct view_build_status final {
|
||||
view_ptr view;
|
||||
dht::token first_token;
|
||||
std::optional<dht::token> next_token;
|
||||
};
|
||||
|
||||
/**
|
||||
* Keeps track of the build progress for all the views of a particular
|
||||
* base table. Each execution of the build step comprises a query of
|
||||
* the base table for the selected range.
|
||||
*
|
||||
* We pin the set of sstables that potentially contain data that should be added to a
|
||||
* view (they are pinned by the flat_mutation_reader). Adding a view v' overwrites the
|
||||
* set of pinned sstables, regardless of there being another view v'' being built. The
|
||||
* new set will potentially contain new data already in v'', written as part of the write
|
||||
* path. We assume this case is rare and optimize for fewer disk space in detriment of
|
||||
* network bandwidth.
|
||||
*/
|
||||
struct build_step final {
|
||||
// Ensure we pin the column_family. It may happen that all views are removed,
|
||||
// and that the base table is too before we can detect it.
|
||||
lw_shared_ptr<column_family> base;
|
||||
query::partition_slice pslice;
|
||||
dht::partition_range prange;
|
||||
flat_mutation_reader reader{nullptr};
|
||||
dht::decorated_key current_key{dht::minimum_token(), partition_key::make_empty()};
|
||||
std::vector<view_build_status> build_status;
|
||||
|
||||
const dht::token& current_token() const {
|
||||
return current_key.token();
|
||||
}
|
||||
};
|
||||
|
||||
using base_to_build_step_type = std::unordered_map<utils::UUID, build_step>;
|
||||
|
||||
database& _db;
|
||||
db::system_distributed_keyspace& _sys_dist_ks;
|
||||
service::migration_manager& _mm;
|
||||
base_to_build_step_type _base_to_build_step;
|
||||
base_to_build_step_type::iterator _current_step = _base_to_build_step.end();
|
||||
serialized_action _build_step{std::bind(&view_builder::do_build_step, this)};
|
||||
// Ensures bookkeeping operations are serialized, meaning that while we execute
|
||||
// a build step we don't consider newly added or removed views. This simplifies
|
||||
// the algorithms. Also synchronizes an operation wrt. a call to stop().
|
||||
seastar::semaphore _sem{1};
|
||||
seastar::abort_source _as;
|
||||
// Used to coordinate between shards the conclusion of the build process for a particular view.
|
||||
std::unordered_set<utils::UUID> _built_views;
|
||||
// Used for testing.
|
||||
std::unordered_map<std::pair<sstring, sstring>, seastar::shared_promise<>, utils::tuple_hash> _build_notifiers;
|
||||
|
||||
public:
|
||||
static constexpr size_t batch_size = 128;
|
||||
|
||||
public:
|
||||
view_builder(database&, db::system_distributed_keyspace&, service::migration_manager&);
|
||||
view_builder(view_builder&&) = delete;
|
||||
|
||||
/**
|
||||
* Loads the state stored in the system tables to resume building the existing views.
|
||||
* Requires that all views have been loaded from the system tables and are accessible
|
||||
* through the database, and that the commitlog has been replayed.
|
||||
*/
|
||||
future<> start();
|
||||
|
||||
/**
|
||||
* Stops the view building process.
|
||||
*/
|
||||
future<> stop();
|
||||
|
||||
virtual void on_create_view(const sstring& ks_name, const sstring& view_name) override;
|
||||
virtual void on_update_view(const sstring& ks_name, const sstring& view_name, bool columns_changed) override;
|
||||
virtual void on_drop_view(const sstring& ks_name, const sstring& view_name) override;
|
||||
|
||||
// For tests
|
||||
future<> wait_until_built(const sstring& ks_name, const sstring& view_name, lowres_clock::time_point timeout);
|
||||
|
||||
private:
|
||||
build_step& get_or_create_build_step(utils::UUID);
|
||||
void initialize_reader_at_current_token(build_step&);
|
||||
void load_view_status(view_build_status, std::unordered_set<utils::UUID>&);
|
||||
void reshard(std::vector<std::vector<view_build_status>>, std::unordered_set<utils::UUID>&);
|
||||
future<> calculate_shard_build_step(std::vector<system_keyspace::view_name>, std::vector<system_keyspace::view_build_progress>);
|
||||
future<> add_new_view(view_ptr, build_step&);
|
||||
future<> do_build_step();
|
||||
void execute(build_step&, exponential_backoff_retry);
|
||||
future<> maybe_mark_view_as_built(view_ptr, dht::token);
|
||||
|
||||
struct consumer;
|
||||
};
|
||||
|
||||
}
|
||||
@@ -621,3 +621,37 @@ make_flat_multi_range_reader(schema_ptr s, mutation_source source, const dht::pa
|
||||
return make_flat_mutation_reader<flat_multi_range_mutation_reader>(std::move(s), std::move(source), ranges,
|
||||
slice, pc, std::move(trace_state), fwd_mr);
|
||||
}
|
||||
|
||||
flat_mutation_reader
|
||||
make_flat_mutation_reader_from_fragments(schema_ptr schema, std::deque<mutation_fragment> fragments) {
|
||||
class reader : public flat_mutation_reader::impl {
|
||||
std::deque<mutation_fragment> _fragments;
|
||||
public:
|
||||
reader(schema_ptr schema, std::deque<mutation_fragment> fragments)
|
||||
: flat_mutation_reader::impl(std::move(schema))
|
||||
, _fragments(std::move(fragments)) {
|
||||
}
|
||||
virtual future<> fill_buffer(db::timeout_clock::time_point) override {
|
||||
while (!(_end_of_stream = _fragments.empty()) && !is_buffer_full()) {
|
||||
push_mutation_fragment(std::move(_fragments.front()));
|
||||
_fragments.pop_front();
|
||||
}
|
||||
return make_ready_future<>();
|
||||
}
|
||||
virtual void next_partition() override {
|
||||
clear_buffer_to_next_partition();
|
||||
if (is_buffer_empty()) {
|
||||
while (!(_end_of_stream = _fragments.empty()) && !_fragments.front().is_partition_start()) {
|
||||
_fragments.pop_front();
|
||||
}
|
||||
}
|
||||
}
|
||||
virtual future<> fast_forward_to(position_range pr, db::timeout_clock::time_point timeout) override {
|
||||
throw std::runtime_error("This reader can't be fast forwarded to another range.");
|
||||
}
|
||||
virtual future<> fast_forward_to(const dht::partition_range& pr, db::timeout_clock::time_point timeout) override {
|
||||
throw std::runtime_error("This reader can't be fast forwarded to another position.");
|
||||
}
|
||||
};
|
||||
return make_flat_mutation_reader<reader>(std::move(schema), std::move(fragments));
|
||||
}
|
||||
|
||||
@@ -33,6 +33,8 @@
|
||||
#include <seastar/core/thread.hh>
|
||||
#include "db/timeout_clock.hh"
|
||||
|
||||
#include <deque>
|
||||
|
||||
using seastar::future;
|
||||
|
||||
class mutation_source;
|
||||
@@ -557,6 +559,9 @@ make_flat_multi_range_reader(schema_ptr s, mutation_source source, const dht::pa
|
||||
tracing::trace_state_ptr trace_state = nullptr,
|
||||
flat_mutation_reader::partition_range_forwarding fwd_mr = flat_mutation_reader::partition_range_forwarding::yes);
|
||||
|
||||
flat_mutation_reader
|
||||
make_flat_mutation_reader_from_fragments(schema_ptr, std::deque<mutation_fragment>);
|
||||
|
||||
// Calls the consumer for each element of the reader's stream until end of stream
|
||||
// is reached or the consumer requests iteration to stop by returning stop_iteration::yes.
|
||||
// The consumer should accept mutation as the argument and return stop_iteration.
|
||||
|
||||
4
init.cc
4
init.cc
@@ -34,8 +34,8 @@ logging::logger startlog("init");
|
||||
// duplicated in cql_test_env.cc
|
||||
// until proper shutdown is done.
|
||||
|
||||
void init_storage_service(distributed<database>& db, sharded<auth::service>& auth_service) {
|
||||
service::init_storage_service(db, auth_service).get();
|
||||
void init_storage_service(distributed<database>& db, sharded<auth::service>& auth_service, sharded<db::system_distributed_keyspace>& sys_dist_ks) {
|
||||
service::init_storage_service(db, auth_service, sys_dist_ks).get();
|
||||
// #293 - do not stop anything
|
||||
//engine().at_exit([] { return service::deinit_storage_service(); });
|
||||
}
|
||||
|
||||
3
init.hh
3
init.hh
@@ -25,6 +25,7 @@
|
||||
#include <seastar/core/distributed.hh>
|
||||
#include "auth/service.hh"
|
||||
#include "db/config.hh"
|
||||
#include "db/system_distributed_keyspace.hh"
|
||||
#include "database.hh"
|
||||
#include "log.hh"
|
||||
|
||||
@@ -36,7 +37,7 @@ extern logging::logger startlog;
|
||||
|
||||
class bad_configuration_error : public std::exception {};
|
||||
|
||||
void init_storage_service(distributed<database>& db, sharded<auth::service>&);
|
||||
void init_storage_service(distributed<database>& db, sharded<auth::service>&, sharded<db::system_distributed_keyspace>&);
|
||||
void init_ms_fd_gossiper(sstring listen_address
|
||||
, uint16_t storage_port
|
||||
, uint16_t ssl_storage_port
|
||||
|
||||
4
keys.hh
4
keys.hh
@@ -304,6 +304,10 @@ public:
|
||||
return get_compound_type(s)->end(_bytes);
|
||||
}
|
||||
|
||||
bool is_empty(const schema& s) const {
|
||||
return begin(s) == end(s);
|
||||
}
|
||||
|
||||
// Returns a range of bytes_view
|
||||
auto components() const {
|
||||
return TopLevelView::compound::element_type::components(representation());
|
||||
|
||||
17
main.cc
17
main.cc
@@ -35,10 +35,12 @@
|
||||
#include "service/load_broadcaster.hh"
|
||||
#include "streaming/stream_session.hh"
|
||||
#include "db/system_keyspace.hh"
|
||||
#include "db/system_distributed_keyspace.hh"
|
||||
#include "db/batchlog_manager.hh"
|
||||
#include "db/commitlog/commitlog.hh"
|
||||
#include "db/hints/manager.hh"
|
||||
#include "db/commitlog/commitlog_replayer.hh"
|
||||
#include "db/view/view_builder.hh"
|
||||
#include "utils/runtime.hh"
|
||||
#include "utils/file_lock.hh"
|
||||
#include "log.hh"
|
||||
@@ -457,9 +459,11 @@ int main(int ac, char** av) {
|
||||
ctx.http_server.listen(ipv4_addr{ip, api_port}).get();
|
||||
startlog.info("Scylla API server listening on {}:{} ...", api_address, api_port);
|
||||
static sharded<auth::service> auth_service;
|
||||
static sharded<db::system_distributed_keyspace> sys_dist_ks;
|
||||
supervisor::notify("initializing storage service");
|
||||
init_storage_service(db, auth_service);
|
||||
init_storage_service(db, auth_service, sys_dist_ks);
|
||||
supervisor::notify("starting per-shard database core");
|
||||
|
||||
// Note: changed from using a move here, because we want the config object intact.
|
||||
database_config dbcfg;
|
||||
auto make_sched_group = [&] (sstring name, unsigned shares) {
|
||||
@@ -614,7 +618,7 @@ int main(int ac, char** av) {
|
||||
}
|
||||
// If the same sstable is shared by several shards, it cannot be
|
||||
// deleted until all shards decide to compact it. So we want to
|
||||
// start thse compactions now. Note we start compacting only after
|
||||
// start these compactions now. Note we start compacting only after
|
||||
// all sstables in this CF were loaded on all shards - otherwise
|
||||
// we will have races between the compaction and loading processes
|
||||
// We also want to trigger regular compaction on boot.
|
||||
@@ -688,6 +692,11 @@ int main(int ac, char** av) {
|
||||
proxy.invoke_on_all([] (service::storage_proxy& local_proxy) { local_proxy.start_hints_manager(gms::get_local_gossiper().shared_from_this()); }).get();
|
||||
}
|
||||
|
||||
static sharded<db::view::view_builder> view_builder;
|
||||
supervisor::notify("starting the view builder");
|
||||
view_builder.start(std::ref(db), std::ref(sys_dist_ks), std::ref(mm)).get();
|
||||
view_builder.invoke_on_all(&db::view::view_builder::start).get();
|
||||
|
||||
supervisor::notify("starting native transport");
|
||||
service::get_local_storage_service().start_native_transport().get();
|
||||
if (start_thrift) {
|
||||
@@ -718,6 +727,10 @@ int main(int ac, char** av) {
|
||||
return service::get_local_storage_service().drain_on_shutdown();
|
||||
});
|
||||
|
||||
engine().at_exit([] {
|
||||
return view_builder.stop();
|
||||
});
|
||||
|
||||
engine().at_exit([&db] {
|
||||
return db.invoke_on_all([](auto& db) {
|
||||
return db.get_compaction_manager().stop();
|
||||
|
||||
@@ -73,6 +73,26 @@ public:
|
||||
virtual void on_drop_function(const sstring& ks_name, const sstring& function_name) = 0;
|
||||
virtual void on_drop_aggregate(const sstring& ks_name, const sstring& aggregate_name) = 0;
|
||||
virtual void on_drop_view(const sstring& ks_name, const sstring& view_name) = 0;
|
||||
|
||||
class only_view_notifications;
|
||||
};
|
||||
|
||||
class migration_listener::only_view_notifications : public migration_listener {
|
||||
virtual void on_create_keyspace(const sstring& ks_name) { }
|
||||
virtual void on_create_column_family(const sstring& ks_name, const sstring& cf_name) { }
|
||||
virtual void on_create_user_type(const sstring& ks_name, const sstring& type_name) { }
|
||||
virtual void on_create_function(const sstring& ks_name, const sstring& function_name) { }
|
||||
virtual void on_create_aggregate(const sstring& ks_name, const sstring& aggregate_name) { }
|
||||
virtual void on_update_keyspace(const sstring& ks_name) { }
|
||||
virtual void on_update_column_family(const sstring& ks_name, const sstring& cf_name, bool columns_changed) { }
|
||||
virtual void on_update_user_type(const sstring& ks_name, const sstring& type_name) { }
|
||||
virtual void on_update_function(const sstring& ks_name, const sstring& function_name) { }
|
||||
virtual void on_update_aggregate(const sstring& ks_name, const sstring& aggregate_name) { }
|
||||
virtual void on_drop_keyspace(const sstring& ks_name) { }
|
||||
virtual void on_drop_column_family(const sstring& ks_name, const sstring& cf_name) { }
|
||||
virtual void on_drop_user_type(const sstring& ks_name, const sstring& type_name) { }
|
||||
virtual void on_drop_function(const sstring& ks_name, const sstring& function_name) { }
|
||||
virtual void on_drop_aggregate(const sstring& ks_name, const sstring& aggregate_name) { }
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
@@ -105,11 +105,12 @@ int get_generation_number() {
|
||||
return generation_number;
|
||||
}
|
||||
|
||||
storage_service::storage_service(distributed<database>& db, sharded<auth::service>& auth_service)
|
||||
storage_service::storage_service(distributed<database>& db, sharded<auth::service>& auth_service, sharded<db::system_distributed_keyspace>& sys_dist_ks)
|
||||
: _db(db)
|
||||
, _auth_service(auth_service)
|
||||
, _replicate_action([this] { return do_replicate_to_all_cores(); })
|
||||
, _update_pending_ranges_action([this] { return do_update_pending_ranges(); }) {
|
||||
, _update_pending_ranges_action([this] { return do_update_pending_ranges(); })
|
||||
, _sys_dist_ks(sys_dist_ks) {
|
||||
sstable_read_error.connect([this] { isolate_on_error(); });
|
||||
sstable_write_error.connect([this] { isolate_on_error(); });
|
||||
general_disk_error.connect([this] { isolate_on_error(); });
|
||||
@@ -551,6 +552,12 @@ void storage_service::join_token_ring(int delay) {
|
||||
|
||||
supervisor::notify("starting tracing");
|
||||
tracing::tracing::start_tracing().get();
|
||||
|
||||
supervisor::notify("starting system distributed keyspace");
|
||||
_sys_dist_ks.start(
|
||||
std::ref(cql3::get_query_processor()),
|
||||
std::ref(service::get_migration_manager())).get();
|
||||
_sys_dist_ks.invoke_on_all(&db::system_distributed_keyspace::start).get();
|
||||
} else {
|
||||
slogger.info("Startup complete, but write survey mode is active, not becoming an active ring member. Use JMX (StorageService->joinRing()) to finalize ring joining.");
|
||||
}
|
||||
@@ -1259,6 +1266,9 @@ future<> storage_service::drain_on_shutdown() {
|
||||
tracing::tracing::tracing_instance().stop().get();
|
||||
slogger.info("Drain on shutdown: tracing is stopped");
|
||||
|
||||
ss._sys_dist_ks.invoke_on_all(&db::system_distributed_keyspace::stop).get();
|
||||
slogger.info("Drain on shutdown: system distributed keyspace stopped");
|
||||
|
||||
get_storage_proxy().invoke_on_all([] (storage_proxy& local_proxy) {
|
||||
return local_proxy.stop_hints_manager();
|
||||
}).get();
|
||||
@@ -3419,5 +3429,18 @@ storage_service::get_natural_endpoints(const sstring& keyspace, const token& pos
|
||||
return _db.local().find_keyspace(keyspace).get_replication_strategy().get_natural_endpoints(pos);
|
||||
}
|
||||
|
||||
future<std::unordered_map<sstring, sstring>>
|
||||
storage_service::view_build_statuses(sstring keyspace, sstring view_name) const {
|
||||
return _sys_dist_ks.local().view_status(std::move(keyspace), std::move(view_name)).then([this] (std::unordered_map<utils::UUID, sstring> status) {
|
||||
auto& endpoint_to_host_id = get_token_metadata().get_endpoint_to_host_id_map_for_reading();
|
||||
return boost::copy_range<std::unordered_map<sstring, sstring>>(endpoint_to_host_id
|
||||
| boost::adaptors::transformed([&status] (const std::pair<inet_address, utils::UUID>& p) {
|
||||
auto it = status.find(p.second);
|
||||
auto s = it != status.end() ? std::move(it->second) : "UNKNOWN";
|
||||
return std::pair(p.first.to_sstring(), std::move(s));
|
||||
}));
|
||||
});
|
||||
}
|
||||
|
||||
} // namespace service
|
||||
|
||||
|
||||
@@ -51,6 +51,7 @@
|
||||
#include "dht/token_range_endpoints.hh"
|
||||
#include "core/sleep.hh"
|
||||
#include "gms/application_state.hh"
|
||||
#include "db/system_distributed_keyspace.hh"
|
||||
#include "core/semaphore.hh"
|
||||
#include "utils/fb_utilities.hh"
|
||||
#include "utils/serialized_action.hh"
|
||||
@@ -133,7 +134,7 @@ private:
|
||||
bool _ms_stopped = false;
|
||||
bool _stream_manager_stopped = false;
|
||||
public:
|
||||
storage_service(distributed<database>& db, sharded<auth::service>&);
|
||||
storage_service(distributed<database>& db, sharded<auth::service>&, sharded<db::system_distributed_keyspace>&);
|
||||
void isolate_on_error();
|
||||
void isolate_on_commit_error();
|
||||
|
||||
@@ -732,6 +733,7 @@ private:
|
||||
future<> do_replicate_to_all_cores();
|
||||
serialized_action _replicate_action;
|
||||
serialized_action _update_pending_ranges_action;
|
||||
sharded<db::system_distributed_keyspace>& _sys_dist_ks;
|
||||
private:
|
||||
/**
|
||||
* Replicates token_metadata contents on shard0 instance to other shards.
|
||||
@@ -2026,6 +2028,8 @@ public:
|
||||
}
|
||||
#endif
|
||||
|
||||
future<std::unordered_map<sstring, sstring>> view_build_statuses(sstring keyspace, sstring view_name) const;
|
||||
|
||||
private:
|
||||
/**
|
||||
* Seed data to the endpoints that will be responsible for it at the future
|
||||
@@ -2307,8 +2311,8 @@ public:
|
||||
}
|
||||
};
|
||||
|
||||
inline future<> init_storage_service(distributed<database>& db, sharded<auth::service>& auth_service) {
|
||||
return service::get_storage_service().start(std::ref(db), std::ref(auth_service));
|
||||
inline future<> init_storage_service(distributed<database>& db, sharded<auth::service>& auth_service, sharded<db::system_distributed_keyspace>& sys_dist_ks) {
|
||||
return service::get_storage_service().start(std::ref(db), std::ref(auth_service), std::ref(sys_dist_ks));
|
||||
}
|
||||
|
||||
inline future<> deinit_storage_service() {
|
||||
|
||||
1
test.py
1
test.py
@@ -88,6 +88,7 @@ boost_tests = [
|
||||
'counter_test',
|
||||
'cell_locker_test',
|
||||
'view_schema_test',
|
||||
'view_build_test',
|
||||
'clustering_ranges_walker_test',
|
||||
'vint_serialization_test',
|
||||
'duration_test',
|
||||
|
||||
@@ -54,6 +54,15 @@ rows_assertions::is_empty() {
|
||||
return {*this};
|
||||
}
|
||||
|
||||
rows_assertions
|
||||
rows_assertions::is_not_empty() {
|
||||
auto row_count = _rows->rs().size();
|
||||
if (row_count == 0) {
|
||||
fail("Expected some rows, but was result was empty");
|
||||
}
|
||||
return {*this};
|
||||
}
|
||||
|
||||
rows_assertions
|
||||
rows_assertions::with_row(std::initializer_list<bytes_opt> values) {
|
||||
std::vector<bytes_opt> expected_row(values);
|
||||
|
||||
@@ -33,6 +33,7 @@ public:
|
||||
rows_assertions(shared_ptr<cql_transport::messages::result_message::rows> rows);
|
||||
rows_assertions with_size(size_t size);
|
||||
rows_assertions is_empty();
|
||||
rows_assertions is_not_empty();
|
||||
rows_assertions with_row(std::initializer_list<bytes_opt> values);
|
||||
|
||||
// Verifies that the result has the following rows and only that rows, in that order.
|
||||
|
||||
@@ -39,6 +39,7 @@
|
||||
#include "tmpdir.hh"
|
||||
#include "db/query_context.hh"
|
||||
#include "test_services.hh"
|
||||
#include "db/view/view_builder.hh"
|
||||
|
||||
// TODO: remove (#293)
|
||||
#include "message/messaging_service.hh"
|
||||
@@ -47,6 +48,7 @@
|
||||
#include "service/storage_service.hh"
|
||||
#include "auth/service.hh"
|
||||
#include "db/system_keyspace.hh"
|
||||
#include "db/system_distributed_keyspace.hh"
|
||||
|
||||
namespace sstables {
|
||||
|
||||
@@ -88,6 +90,7 @@ public:
|
||||
private:
|
||||
::shared_ptr<distributed<database>> _db;
|
||||
::shared_ptr<sharded<auth::service>> _auth_service;
|
||||
::shared_ptr<sharded<db::view::view_builder>> _view_builder;
|
||||
lw_shared_ptr<tmpdir> _data_dir;
|
||||
private:
|
||||
struct core_local_state {
|
||||
@@ -112,7 +115,13 @@ private:
|
||||
return ::make_shared<service::query_state>(_core_local.local().client_state);
|
||||
}
|
||||
public:
|
||||
single_node_cql_env(::shared_ptr<distributed<database>> db, ::shared_ptr<sharded<auth::service>> auth_service) : _db(db), _auth_service(std::move(auth_service))
|
||||
single_node_cql_env(
|
||||
::shared_ptr<distributed<database>> db,
|
||||
::shared_ptr<sharded<auth::service>> auth_service,
|
||||
::shared_ptr<sharded<db::view::view_builder>> view_builder)
|
||||
: _db(db)
|
||||
, _auth_service(std::move(auth_service))
|
||||
, _view_builder(std::move(view_builder))
|
||||
{ }
|
||||
|
||||
virtual future<::shared_ptr<cql_transport::messages::result_message>> execute_cql(const sstring& text) override {
|
||||
@@ -255,6 +264,10 @@ public:
|
||||
return _auth_service->local();
|
||||
}
|
||||
|
||||
virtual db::view::view_builder& local_view_builder() override {
|
||||
return _view_builder->local();
|
||||
}
|
||||
|
||||
future<> start() {
|
||||
return _core_local.start(std::ref(*_auth_service));
|
||||
}
|
||||
@@ -308,9 +321,11 @@ public:
|
||||
auto stop_ms = defer([&ms] { ms.stop().get(); });
|
||||
|
||||
auto auth_service = ::make_shared<sharded<auth::service>>();
|
||||
auto sys_dist_ks = seastar::sharded<db::system_distributed_keyspace>();
|
||||
auto stop_sys_dist_ks = defer([&sys_dist_ks] { sys_dist_ks.stop().get(); });
|
||||
|
||||
auto& ss = service::get_storage_service();
|
||||
ss.start(std::ref(*db), std::ref(*auth_service)).get();
|
||||
ss.start(std::ref(*db), std::ref(*auth_service), std::ref(sys_dist_ks)).get();
|
||||
auto stop_storage_service = defer([&ss] { ss.stop().get(); });
|
||||
|
||||
db->start(std::move(*cfg), database_config()).get();
|
||||
@@ -369,6 +384,13 @@ public:
|
||||
auth_service->stop().get();
|
||||
});
|
||||
|
||||
auto view_builder = ::make_shared<seastar::sharded<db::view::view_builder>>();
|
||||
view_builder->start(std::ref(*db), std::ref(sys_dist_ks), std::ref(mm)).get();
|
||||
view_builder->invoke_on_all(&db::view::view_builder::start).get();
|
||||
auto stop_view_builder = defer([view_builder] {
|
||||
view_builder->stop().get();
|
||||
});
|
||||
|
||||
// Create the testing user.
|
||||
try {
|
||||
auth::role_config config;
|
||||
@@ -384,7 +406,7 @@ public:
|
||||
// The default user may already exist if this `cql_test_env` is starting with previously populated data.
|
||||
}
|
||||
|
||||
single_node_cql_env env(db, auth_service);
|
||||
single_node_cql_env env(db, auth_service, view_builder);
|
||||
env.start().get();
|
||||
auto stop_env = defer([&env] { env.stop().get(); });
|
||||
|
||||
@@ -423,12 +445,13 @@ future<> do_with_cql_env_thread(std::function<void(cql_test_env&)> func) {
|
||||
class storage_service_for_tests::impl {
|
||||
distributed<database> _db;
|
||||
sharded<auth::service> _auth_service;
|
||||
sharded<db::system_distributed_keyspace> _sys_dist_ks;
|
||||
public:
|
||||
impl() {
|
||||
auto thread = seastar::thread_impl::get();
|
||||
assert(thread);
|
||||
netw::get_messaging_service().start(gms::inet_address("127.0.0.1"), 7000, false).get();
|
||||
service::get_storage_service().start(std::ref(_db), std::ref(_auth_service)).get();
|
||||
service::get_storage_service().start(std::ref(_db), std::ref(_auth_service), std::ref(_sys_dist_ks)).get();
|
||||
service::get_storage_service().invoke_on_all([] (auto& ss) {
|
||||
ss.enable_all_features();
|
||||
}).get();
|
||||
|
||||
@@ -38,6 +38,10 @@
|
||||
|
||||
class database;
|
||||
|
||||
namespace db::view {
|
||||
class view_builder;
|
||||
}
|
||||
|
||||
namespace auth {
|
||||
class service;
|
||||
}
|
||||
@@ -95,9 +99,28 @@ public:
|
||||
virtual distributed<cql3::query_processor> & qp() = 0;
|
||||
|
||||
virtual auth::service& local_auth_service() = 0;
|
||||
|
||||
virtual db::view::view_builder& local_view_builder() = 0;
|
||||
};
|
||||
|
||||
future<> do_with_cql_env(std::function<future<>(cql_test_env&)> func);
|
||||
future<> do_with_cql_env(std::function<future<>(cql_test_env&)> func, const db::config&);
|
||||
future<> do_with_cql_env_thread(std::function<void(cql_test_env&)> func);
|
||||
future<> do_with_cql_env_thread(std::function<void(cql_test_env&)> func, const db::config&);
|
||||
|
||||
template<typename EventuallySucceedingFunction>
|
||||
static void eventually(EventuallySucceedingFunction&& f, size_t max_attempts = 10) {
|
||||
size_t attempts = 0;
|
||||
while (true) {
|
||||
try {
|
||||
f();
|
||||
break;
|
||||
} catch (...) {
|
||||
if (++attempts < max_attempts) {
|
||||
sleep(std::chrono::milliseconds(1 << attempts)).get0();
|
||||
} else {
|
||||
throw;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -22,6 +22,7 @@
|
||||
|
||||
#include "core/reactor.hh"
|
||||
#include "core/app-template.hh"
|
||||
#include "db/system_distributed_keyspace.hh"
|
||||
#include "message/messaging_service.hh"
|
||||
#include "gms/failure_detector.hh"
|
||||
#include "gms/gossiper.hh"
|
||||
@@ -69,7 +70,8 @@ int main(int ac, char ** av) {
|
||||
utils::fb_utilities::set_broadcast_rpc_address(listen);
|
||||
auto vv = std::make_shared<gms::versioned_value::factory>();
|
||||
locator::i_endpoint_snitch::create_snitch("SimpleSnitch").then([&auth_service, &db] {
|
||||
return service::init_storage_service(db, auth_service);
|
||||
sharded<db::system_distributed_keyspace> sys_dist_ks;
|
||||
return service::init_storage_service(db, auth_service, sys_dist_ks);
|
||||
}).then([vv, listen, config] {
|
||||
return netw::get_messaging_service().start(listen);
|
||||
}).then([config] {
|
||||
|
||||
@@ -31,14 +31,16 @@
|
||||
#include "service/storage_service.hh"
|
||||
#include "core/distributed.hh"
|
||||
#include "database.hh"
|
||||
#include "db/system_distributed_keyspace.hh"
|
||||
|
||||
SEASTAR_TEST_CASE(test_boot_shutdown){
|
||||
return seastar::async([] {
|
||||
distributed<database> db;
|
||||
sharded<auth::service> auth_service;
|
||||
sharded<db::system_distributed_keyspace> sys_dist_ks;
|
||||
utils::fb_utilities::set_broadcast_address(gms::inet_address("127.0.0.1"));
|
||||
locator::i_endpoint_snitch::create_snitch("SimpleSnitch").get();
|
||||
service::get_storage_service().start(std::ref(db), std::ref(auth_service)).get();
|
||||
service::get_storage_service().start(std::ref(db), std::ref(auth_service), std::ref(sys_dist_ks)).get();
|
||||
db.start().get();
|
||||
netw::get_messaging_service().start(gms::inet_address("127.0.0.1")).get();
|
||||
gms::get_failure_detector().start().get();
|
||||
|
||||
342
tests/view_build_test.cc
Normal file
342
tests/view_build_test.cc
Normal file
@@ -0,0 +1,342 @@
|
||||
/*
|
||||
* Copyright (C) 2018 ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* This file is part of Scylla.
|
||||
*
|
||||
* Scylla is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Affero General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* Scylla is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include <boost/test/unit_test.hpp>
|
||||
|
||||
#include "database.hh"
|
||||
#include "db/view/view_builder.hh"
|
||||
#include "db/system_keyspace.hh"
|
||||
|
||||
#include "tests/test-utils.hh"
|
||||
#include "tests/cql_test_env.hh"
|
||||
#include "tests/cql_assertions.hh"
|
||||
|
||||
using namespace std::literals::chrono_literals;
|
||||
|
||||
SEASTAR_TEST_CASE(test_builder_with_large_partition) {
|
||||
return do_with_cql_env_thread([] (cql_test_env& e) {
|
||||
e.execute_cql("create table cf (p int, c int, v int, primary key (p, c))").get();
|
||||
|
||||
for (auto i = 0; i < 1024; ++i) {
|
||||
e.execute_cql(sprint("insert into cf (p, c, v) values (0, %d, 0)", i)).get();
|
||||
}
|
||||
|
||||
auto f = e.local_view_builder().wait_until_built("ks", "vcf", lowres_clock::now() + 10s);
|
||||
e.execute_cql("create materialized view vcf as select * from cf "
|
||||
"where p is not null and c is not null and v is not null "
|
||||
"primary key (v, c, p)").get();
|
||||
|
||||
f.get();
|
||||
auto built = db::system_keyspace::load_built_views().get0();
|
||||
BOOST_REQUIRE_EQUAL(built.size(), 1);
|
||||
BOOST_REQUIRE_EQUAL(built[0].second, sstring("vcf"));
|
||||
|
||||
auto msg = e.execute_cql("select count(*) from vcf where v = 0").get0();
|
||||
assert_that(msg).is_rows().with_size(1);
|
||||
assert_that(msg).is_rows().with_rows({{{long_type->decompose(1024L)}}});
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_builder_with_multiple_partitions) {
|
||||
return do_with_cql_env_thread([] (cql_test_env& e) {
|
||||
e.execute_cql("create table cf (p int, c int, v int, primary key (p, c))").get();
|
||||
|
||||
for (auto i = 0; i < 1024; ++i) {
|
||||
e.execute_cql(sprint("insert into cf (p, c, v) values (%d, %d, 0)", i % 5, i)).get();
|
||||
}
|
||||
|
||||
auto f = e.local_view_builder().wait_until_built("ks", "vcf", lowres_clock::now() + 10s);
|
||||
e.execute_cql("create materialized view vcf as select * from cf "
|
||||
"where p is not null and c is not null and v is not null "
|
||||
"primary key (v, c, p)").get();
|
||||
|
||||
f.get();
|
||||
auto built = db::system_keyspace::load_built_views().get0();
|
||||
BOOST_REQUIRE_EQUAL(built.size(), 1);
|
||||
BOOST_REQUIRE_EQUAL(built[0].second, sstring("vcf"));
|
||||
|
||||
auto msg = e.execute_cql("select count(*) from vcf where v = 0").get0();
|
||||
assert_that(msg).is_rows().with_size(1);
|
||||
assert_that(msg).is_rows().with_rows({{{long_type->decompose(1024L)}}});
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_builder_with_multiple_partitions_of_batch_size_rows) {
|
||||
return do_with_cql_env_thread([] (cql_test_env& e) {
|
||||
e.execute_cql("create table cf (p int, c int, v int, primary key (p, c))").get();
|
||||
|
||||
for (auto i = 0; i < 1024; ++i) {
|
||||
e.execute_cql(sprint("insert into cf (p, c, v) values (%d, %d, 0)", i % db::view::view_builder::batch_size, i)).get();
|
||||
}
|
||||
|
||||
auto f = e.local_view_builder().wait_until_built("ks", "vcf", lowres_clock::now() + 10s);
|
||||
e.execute_cql("create materialized view vcf as select * from cf "
|
||||
"where p is not null and c is not null and v is not null "
|
||||
"primary key (v, c, p)").get();
|
||||
|
||||
f.get();
|
||||
auto built = db::system_keyspace::load_built_views().get0();
|
||||
BOOST_REQUIRE_EQUAL(built.size(), 1);
|
||||
BOOST_REQUIRE_EQUAL(built[0].second, sstring("vcf"));
|
||||
|
||||
auto msg = e.execute_cql("select count(*) from vcf where v = 0").get0();
|
||||
assert_that(msg).is_rows().with_size(1);
|
||||
assert_that(msg).is_rows().with_rows({{{long_type->decompose(1024L)}}});
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_builder_view_added_during_ongoing_build) {
|
||||
return do_with_cql_env_thread([] (cql_test_env& e) {
|
||||
e.execute_cql("create table cf (p int, c int, v int, primary key (p, c))").get();
|
||||
|
||||
for (auto i = 0; i < 5000; ++i) {
|
||||
e.execute_cql(sprint("insert into cf (p, c, v) values (0, %d, 0)", i)).get();
|
||||
}
|
||||
|
||||
auto f1 = e.local_view_builder().wait_until_built("ks", "vcf1", lowres_clock::now() + 60s);
|
||||
auto f2 = e.local_view_builder().wait_until_built("ks", "vcf2", lowres_clock::now() + 30s);
|
||||
|
||||
e.execute_cql("create materialized view vcf1 as select * from cf "
|
||||
"where p is not null and c is not null and v is not null "
|
||||
"primary key (v, c, p)").get();
|
||||
|
||||
sleep(1s).get();
|
||||
|
||||
e.execute_cql("create materialized view vcf2 as select * from cf "
|
||||
"where p is not null and c is not null and v is not null "
|
||||
"primary key (v, p, c)").get();
|
||||
|
||||
f2.get();
|
||||
f1.get();
|
||||
auto built = db::system_keyspace::load_built_views().get0();
|
||||
BOOST_REQUIRE_EQUAL(built.size(), 2);
|
||||
|
||||
auto msg = e.execute_cql("select count(*) from vcf1 where v = 0").get0();
|
||||
assert_that(msg).is_rows().with_size(1);
|
||||
assert_that(msg).is_rows().with_rows({{{long_type->decompose(5000L)}}});
|
||||
|
||||
msg = e.execute_cql("select count(*) from vcf2 where v = 0").get0();
|
||||
assert_that(msg).is_rows().with_size(1);
|
||||
assert_that(msg).is_rows().with_rows({{{long_type->decompose(5000L)}}});
|
||||
});
|
||||
}
|
||||
|
||||
std::mt19937 random_generator() {
|
||||
std::random_device rd;
|
||||
// In case of errors, replace the seed with a fixed value to get a deterministic run.
|
||||
auto seed = rd();
|
||||
std::cout << "Random seed: " << seed << "\n";
|
||||
return std::mt19937(seed);
|
||||
}
|
||||
|
||||
bytes random_bytes(size_t size, std::mt19937& gen) {
|
||||
bytes result(bytes::initialized_later(), size);
|
||||
static thread_local std::uniform_int_distribution<int> dist(0, std::numeric_limits<uint8_t>::max());
|
||||
for (size_t i = 0; i < size; ++i) {
|
||||
result[i] = dist(gen);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_builder_across_tokens_with_large_partitions) {
|
||||
return do_with_cql_env_thread([] (cql_test_env& e) {
|
||||
auto gen = random_generator();
|
||||
|
||||
e.execute_cql("create table cf (p blob, c int, v int, primary key (p, c))").get();
|
||||
auto s = e.local_db().find_schema("ks", "cf");
|
||||
|
||||
auto make_key = [&] (auto) { return to_hex(random_bytes(128, gen)); };
|
||||
for (auto&& k : boost::irange(0, 4) | boost::adaptors::transformed(make_key)) {
|
||||
for (auto i = 0; i < 1000; ++i) {
|
||||
e.execute_cql(sprint("insert into cf (p, c, v) values (0x%s, %d, 0)", k, i)).get();
|
||||
}
|
||||
}
|
||||
|
||||
auto f1 = e.local_view_builder().wait_until_built("ks", "vcf1", lowres_clock::now() + 60s);
|
||||
auto f2 = e.local_view_builder().wait_until_built("ks", "vcf2", lowres_clock::now() + 30s);
|
||||
|
||||
e.execute_cql("create materialized view vcf1 as select * from cf "
|
||||
"where p is not null and c is not null and v is not null "
|
||||
"primary key (v, c, p)").get();
|
||||
|
||||
sleep(1s).get();
|
||||
|
||||
e.execute_cql("create materialized view vcf2 as select * from cf "
|
||||
"where p is not null and c is not null and v is not null "
|
||||
"primary key (v, p, c)").get();
|
||||
|
||||
f2.get();
|
||||
f1.get();
|
||||
auto built = db::system_keyspace::load_built_views().get0();
|
||||
BOOST_REQUIRE_EQUAL(built.size(), 2);
|
||||
|
||||
auto msg = e.execute_cql("select count(*) from vcf1 where v = 0").get0();
|
||||
assert_that(msg).is_rows().with_size(1);
|
||||
assert_that(msg).is_rows().with_rows({{{long_type->decompose(4000L)}}});
|
||||
|
||||
msg = e.execute_cql("select count(*) from vcf2 where v = 0").get0();
|
||||
assert_that(msg).is_rows().with_size(1);
|
||||
assert_that(msg).is_rows().with_rows({{{long_type->decompose(4000L)}}});
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_builder_across_tokens_with_small_partitions) {
|
||||
return do_with_cql_env_thread([] (cql_test_env& e) {
|
||||
auto gen = random_generator();
|
||||
|
||||
e.execute_cql("create table cf (p blob, c int, v int, primary key (p, c))").get();
|
||||
auto s = e.local_db().find_schema("ks", "cf");
|
||||
|
||||
auto make_key = [&] (auto) { return to_hex(random_bytes(128, gen)); };
|
||||
for (auto&& k : boost::irange(0, 1000) | boost::adaptors::transformed(make_key)) {
|
||||
for (auto i = 0; i < 4; ++i) {
|
||||
e.execute_cql(sprint("insert into cf (p, c, v) values (0x%s, %d, 0)", k, i)).get();
|
||||
}
|
||||
}
|
||||
|
||||
auto f1 = e.local_view_builder().wait_until_built("ks", "vcf1", lowres_clock::now() + 60s);
|
||||
auto f2 = e.local_view_builder().wait_until_built("ks", "vcf2", lowres_clock::now() + 30s);
|
||||
|
||||
e.execute_cql("create materialized view vcf1 as select * from cf "
|
||||
"where p is not null and c is not null and v is not null "
|
||||
"primary key (v, c, p)").get();
|
||||
|
||||
sleep(1s).get();
|
||||
|
||||
e.execute_cql("create materialized view vcf2 as select * from cf "
|
||||
"where p is not null and c is not null and v is not null "
|
||||
"primary key (v, p, c)").get();
|
||||
|
||||
f2.get();
|
||||
f1.get();
|
||||
|
||||
auto built = db::system_keyspace::load_built_views().get0();
|
||||
BOOST_REQUIRE_EQUAL(built.size(), 2);
|
||||
|
||||
auto msg = e.execute_cql("select count(*) from vcf1 where v = 0").get0();
|
||||
assert_that(msg).is_rows().with_size(1);
|
||||
assert_that(msg).is_rows().with_rows({{{long_type->decompose(4000L)}}});
|
||||
|
||||
msg = e.execute_cql("select count(*) from vcf2 where v = 0").get0();
|
||||
assert_that(msg).is_rows().with_size(1);
|
||||
assert_that(msg).is_rows().with_rows({{{long_type->decompose(4000L)}}});
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_builder_with_tombstones) {
|
||||
return do_with_cql_env_thread([] (cql_test_env& e) {
|
||||
e.execute_cql("create table cf (p int, c1 int, c2 int, v int, primary key (p, c1, c2))").get();
|
||||
|
||||
for (auto i = 0; i < 100; ++i) {
|
||||
e.execute_cql(sprint("insert into cf (p, c1, c2, v) values (0, %d, %d, 1)", i % 2, i)).get();
|
||||
}
|
||||
|
||||
e.execute_cql("delete from cf where p = 0 and c1 = 0").get();
|
||||
e.execute_cql("delete from cf where p = 0 and c1 = 1 and c2 >= 50 and c2 < 101").get();
|
||||
|
||||
auto f = e.local_view_builder().wait_until_built("ks", "vcf", lowres_clock::now() + 30s);
|
||||
e.execute_cql("create materialized view vcf as select * from cf "
|
||||
"where p is not null and c1 is not null and c2 is not null and v is not null "
|
||||
"primary key ((v, p), c1, c2)").get();
|
||||
|
||||
f.get();
|
||||
auto built = db::system_keyspace::load_built_views().get0();
|
||||
BOOST_REQUIRE_EQUAL(built.size(), 1);
|
||||
|
||||
auto msg = e.execute_cql("select * from vcf").get0();
|
||||
assert_that(msg).is_rows().with_size(25);
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_builder_with_concurrent_writes) {
|
||||
return do_with_cql_env_thread([] (cql_test_env& e) {
|
||||
auto gen = random_generator();
|
||||
|
||||
e.execute_cql("create table cf (p blob, c int, v int, primary key (p, c))").get();
|
||||
|
||||
const size_t rows = 6864;
|
||||
const size_t rows_per_partition = 4;
|
||||
const size_t partitions = rows / rows_per_partition;
|
||||
|
||||
std::unordered_set<sstring> keys;
|
||||
while (keys.size() != partitions) {
|
||||
keys.insert(to_hex(random_bytes(128, gen)));
|
||||
}
|
||||
|
||||
auto half = keys.begin();
|
||||
std::advance(half, keys.size() / 2);
|
||||
auto k = keys.begin();
|
||||
for (; k != half; ++k) {
|
||||
for (size_t i = 0; i < rows_per_partition; ++i) {
|
||||
e.execute_cql(sprint("insert into cf (p, c, v) values (0x%s, %d, 0)", *k, i)).get();
|
||||
}
|
||||
}
|
||||
|
||||
auto f = e.local_view_builder().wait_until_built("ks", "vcf", lowres_clock::now() + 60s);
|
||||
e.execute_cql("create materialized view vcf as select * from cf "
|
||||
"where p is not null and c is not null and v is not null "
|
||||
"primary key (v, c, p)").get();
|
||||
|
||||
for (; k != keys.end(); ++k) {
|
||||
for (size_t i = 0; i < rows_per_partition; ++i) {
|
||||
e.execute_cql(sprint("insert into cf (p, c, v) values (0x%s, %d, 0)", *k, i)).get();
|
||||
}
|
||||
}
|
||||
|
||||
f.get();
|
||||
eventually([&] {
|
||||
auto msg = e.execute_cql("select * from vcf").get0();
|
||||
assert_that(msg).is_rows().with_size(rows);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_builder_with_concurrent_drop) {
|
||||
return do_with_cql_env_thread([] (cql_test_env& e) {
|
||||
auto gen = random_generator();
|
||||
|
||||
e.execute_cql("create table cf (p blob, c int, v int, primary key (p, c))").get();
|
||||
|
||||
auto make_key = [&] (auto) { return to_hex(random_bytes(128, gen)); };
|
||||
for (auto&& k : boost::irange(0, 1000) | boost::adaptors::transformed(make_key)) {
|
||||
for (auto i = 0; i < 5; ++i) {
|
||||
e.execute_cql(sprint("insert into cf (p, c, v) values (0x%s, %d, 0)", k, i)).get();
|
||||
}
|
||||
}
|
||||
|
||||
e.execute_cql("create materialized view vcf as select * from cf "
|
||||
"where p is not null and c is not null and v is not null "
|
||||
"primary key (v, c, p)").get();
|
||||
|
||||
e.execute_cql("drop materialized view vcf").get();
|
||||
|
||||
eventually([&] {
|
||||
auto msg = e.execute_cql("select * from system.scylla_views_builds_in_progress").get0();
|
||||
assert_that(msg).is_rows().is_empty();
|
||||
msg = e.execute_cql("select * from system.built_views").get0();
|
||||
assert_that(msg).is_rows().is_empty();
|
||||
msg = e.execute_cql("select * from system.views_builds_in_progress").get0();
|
||||
assert_that(msg).is_rows().is_empty();
|
||||
msg = e.execute_cql("select * from system_distributed.view_build_status").get0();
|
||||
assert_that(msg).is_rows().is_empty();
|
||||
});
|
||||
});
|
||||
}
|
||||
@@ -33,24 +33,6 @@
|
||||
|
||||
using namespace std::literals::chrono_literals;
|
||||
|
||||
template<typename EventuallySucceedingFunction>
|
||||
static void eventually(EventuallySucceedingFunction&& f) {
|
||||
constexpr unsigned max_attempts = 10;
|
||||
unsigned attempts = 0;
|
||||
while (true) {
|
||||
try {
|
||||
f();
|
||||
break;
|
||||
} catch (...) {
|
||||
if (++attempts < max_attempts) {
|
||||
sleep(std::chrono::milliseconds(1 << attempts)).get0();
|
||||
} else {
|
||||
throw;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_case_sensitivity) {
|
||||
return do_with_cql_env_thread([] (auto& e) {
|
||||
e.execute_cql("create table cf (theKey int, theClustering int, theValue int, primary key (theKey, theClustering));").get();
|
||||
|
||||
@@ -33,12 +33,14 @@
|
||||
#include "tests/test_services.hh"
|
||||
|
||||
#include "db/size_estimates_virtual_reader.hh"
|
||||
#include "db/system_keyspace.hh"
|
||||
#include "core/future-util.hh"
|
||||
#include "cql3/query_processor.hh"
|
||||
#include "cql3/untyped_result_set.hh"
|
||||
#include "dht/i_partitioner.hh"
|
||||
#include "transport/messages/result_message.hh"
|
||||
|
||||
SEASTAR_TEST_CASE(test_query_virtual_table) {
|
||||
SEASTAR_TEST_CASE(test_query_size_estimates_virtual_table) {
|
||||
return do_with_cql_env([] (auto& e) {
|
||||
auto ranges = db::size_estimates::size_estimates_mutation_reader::get_local_ranges().get0();
|
||||
auto start_token1 = utf8_type->to_string(ranges[3].start);
|
||||
@@ -215,3 +217,50 @@ SEASTAR_TEST_CASE(test_query_virtual_table) {
|
||||
}).discard_result();
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_query_view_built_progress_virtual_table) {
|
||||
return do_with_cql_env_thread([] (cql_test_env& e) {
|
||||
auto rand = [] { return dht::global_partitioner().get_random_token(); };
|
||||
auto next_token = rand();
|
||||
auto next_token_str = dht::global_partitioner().to_sstring(next_token);
|
||||
db::system_keyspace::register_view_for_building("ks", "v1", rand()).get();
|
||||
db::system_keyspace::register_view_for_building("ks", "v2", rand()).get();
|
||||
db::system_keyspace::register_view_for_building("ks", "v3", rand()).get();
|
||||
db::system_keyspace::update_view_build_progress("ks", "v3", next_token).get();
|
||||
db::system_keyspace::register_view_for_building("ks", "v4", rand()).get();
|
||||
db::system_keyspace::update_view_build_progress("ks", "v4", next_token).get();
|
||||
db::system_keyspace::register_view_for_building("ks", "v5", rand()).get();
|
||||
db::system_keyspace::register_view_for_building("ks", "v6", rand()).get();
|
||||
db::system_keyspace::remove_view_build_progress_across_all_shards("ks", "v5").get();
|
||||
db::system_keyspace::remove_view_build_progress_across_all_shards("ks", "v6").get();
|
||||
auto rs = e.execute_cql("select * from system.views_builds_in_progress where keyspace_name = 'ks'").get0();
|
||||
assert_that(rs).is_rows().with_rows_ignore_order({
|
||||
{ {utf8_type->decompose(sstring("ks"))}, {utf8_type->decompose(sstring("v1"))}, {int32_type->decompose(0)}, { } },
|
||||
{ {utf8_type->decompose(sstring("ks"))}, {utf8_type->decompose(sstring("v2"))}, {int32_type->decompose(0)}, { } },
|
||||
{ {utf8_type->decompose(sstring("ks"))}, {utf8_type->decompose(sstring("v3"))}, {int32_type->decompose(0)}, {utf8_type->decompose(next_token_str)} },
|
||||
{ {utf8_type->decompose(sstring("ks"))}, {utf8_type->decompose(sstring("v4"))}, {int32_type->decompose(0)}, {utf8_type->decompose(next_token_str)} }
|
||||
});
|
||||
rs = e.execute_cql("select * from system.views_builds_in_progress").get0();
|
||||
assert_that(rs).is_rows().with_size(4);
|
||||
rs = e.execute_cql("select * from system.views_builds_in_progress limit 1").get0();
|
||||
assert_that(rs).is_rows().with_size(1);
|
||||
rs = e.execute_cql("select * from system.views_builds_in_progress where keyspace_name = 'ks' and view_name = 'v2'").get0();
|
||||
assert_that(rs).is_rows().with_size(1);
|
||||
rs = e.execute_cql("select * from system.views_builds_in_progress where keyspace_name = 'ks' and view_name > 'v2'").get0();
|
||||
assert_that(rs).is_rows().with_size(2);
|
||||
rs = e.execute_cql("select * from system.views_builds_in_progress where keyspace_name = 'ks' and view_name >= 'v2'").get0();
|
||||
assert_that(rs).is_rows().with_size(3);
|
||||
rs = e.execute_cql("select * from system.views_builds_in_progress where keyspace_name = 'ks' and view_name < 'v2'").get0();
|
||||
assert_that(rs).is_rows().with_size(1);
|
||||
rs = e.execute_cql("select * from system.views_builds_in_progress where keyspace_name = 'ks' and view_name <= 'v2'").get0();
|
||||
assert_that(rs).is_rows().with_size(2);
|
||||
rs = e.execute_cql("select * from system.views_builds_in_progress where keyspace_name = 'ks' and view_name in ('v1', 'v2', 'v3')").get0();
|
||||
assert_that(rs).is_rows().with_size(3);
|
||||
rs = e.execute_cql("select * from system.views_builds_in_progress where keyspace_name = 'ks' and view_name >= 'v2' and view_name <= 'v2'").get0();
|
||||
assert_that(rs).is_rows().with_size(1);
|
||||
rs = e.execute_cql("select * from system.views_builds_in_progress where keyspace_name = 'ks' and view_name >= 'v2' and view_name <= 'v3'").get0();
|
||||
assert_that(rs).is_rows().with_size(2);
|
||||
rs = e.execute_cql("select * from system.views_builds_in_progress where keyspace_name = 'ks' and view_name > 'v1' and view_name < 'v2'").get0();
|
||||
assert_that(rs).is_rows().with_size(0);
|
||||
});
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user