Compare commits
36 Commits
next
...
scylla-3.2
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e3e301906d | ||
|
|
2c822d4c1f | ||
|
|
04f8800b5b | ||
|
|
a72a06d3b7 | ||
|
|
f9b11c9b30 | ||
|
|
798357f656 | ||
|
|
7eb86fbbb4 | ||
|
|
edf431f581 | ||
|
|
3f358c9772 | ||
|
|
c8d5738a48 | ||
|
|
de5c06414b | ||
|
|
de314dfe30 | ||
|
|
40a077bf93 | ||
|
|
d488e762cf | ||
|
|
637d80ffcf | ||
|
|
39b17be562 | ||
|
|
e54df0585e | ||
|
|
7d113bd1e9 | ||
|
|
2e7cd77bc4 | ||
|
|
0a38d2b0ee | ||
|
|
2ff26d1160 | ||
|
|
9dd714ae64 | ||
|
|
3980570520 | ||
|
|
9889e553e6 | ||
|
|
3e0b09faa1 | ||
|
|
bc4106ff45 | ||
|
|
df3563c1ae | ||
|
|
1c89961c4f | ||
|
|
85b1a45252 | ||
|
|
6a847e2242 | ||
|
|
10cf0e0d91 | ||
|
|
8c1474c039 | ||
|
|
bb5e9527bb | ||
|
|
4dae72b2cd | ||
|
|
1e444a3dd5 | ||
|
|
76906d6134 |
2
.gitmodules
vendored
2
.gitmodules
vendored
@@ -1,6 +1,6 @@
|
||||
[submodule "seastar"]
|
||||
path = seastar
|
||||
url = ../seastar
|
||||
url = ../scylla-seastar
|
||||
ignore = dirty
|
||||
[submodule "swagger-ui"]
|
||||
path = swagger-ui
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
#!/bin/sh
|
||||
|
||||
PRODUCT=scylla
|
||||
VERSION=666.development
|
||||
VERSION=3.2.1
|
||||
|
||||
if test -f version
|
||||
then
|
||||
|
||||
@@ -74,7 +74,7 @@ public:
|
||||
options() = default;
|
||||
options(const std::map<sstring, sstring>& map) {
|
||||
if (map.find("enabled") == std::end(map)) {
|
||||
throw exceptions::configuration_exception("Missing enabled CDC option");
|
||||
return;
|
||||
}
|
||||
|
||||
for (auto& p : map) {
|
||||
@@ -92,6 +92,9 @@ public:
|
||||
}
|
||||
}
|
||||
std::map<sstring, sstring> to_map() const {
|
||||
if (!_enabled) {
|
||||
return {};
|
||||
}
|
||||
return {
|
||||
{ "enabled", _enabled ? "true" : "false" },
|
||||
{ "preimage", _preimage ? "true" : "false" },
|
||||
|
||||
@@ -241,7 +241,10 @@ batch_size_fail_threshold_in_kb: 50
|
||||
# broadcast_rpc_address: 1.2.3.4
|
||||
|
||||
# Uncomment to enable experimental features
|
||||
# experimental: true
|
||||
# experimental_features:
|
||||
# - cdc
|
||||
# - lwt
|
||||
# - udf
|
||||
|
||||
# The directory where hints files are stored if hinted handoff is enabled.
|
||||
# hints_directory: /var/lib/scylla/hints
|
||||
|
||||
@@ -381,6 +381,7 @@ scylla_tests = [
|
||||
'tests/data_listeners_test',
|
||||
'tests/truncation_migration_test',
|
||||
'tests/like_matcher_test',
|
||||
'tests/enum_option_test',
|
||||
]
|
||||
|
||||
perf_tests = [
|
||||
@@ -875,6 +876,7 @@ pure_boost_tests = set([
|
||||
'tests/top_k_test',
|
||||
'tests/small_vector_test',
|
||||
'tests/like_matcher_test',
|
||||
'tests/enum_option_test',
|
||||
])
|
||||
|
||||
tests_not_using_seastar_test_framework = set([
|
||||
|
||||
@@ -478,7 +478,7 @@ inline bool single_column_primary_key_restrictions<clustering_key>::needs_filter
|
||||
// 3. a SLICE restriction isn't on a last place
|
||||
column_id position = 0;
|
||||
for (const auto& restriction : _restrictions->restrictions() | boost::adaptors::map_values) {
|
||||
if (restriction->is_contains() || position != restriction->get_column_def().id) {
|
||||
if (restriction->is_contains() || restriction->is_LIKE() || position != restriction->get_column_def().id) {
|
||||
return true;
|
||||
}
|
||||
if (!restriction->is_slice()) {
|
||||
|
||||
@@ -111,7 +111,9 @@ lw_shared_ptr<query::read_command> cas_request::read_command() const {
|
||||
} else {
|
||||
ranges = query::clustering_range::deoverlap(std::move(ranges), clustering_key::tri_compare(*_schema));
|
||||
}
|
||||
query::partition_slice ps(std::move(ranges), *_schema, columns_to_read, update_parameters::options);
|
||||
auto options = update_parameters::options;
|
||||
options.set(query::partition_slice::option::always_return_static_content);
|
||||
query::partition_slice ps(std::move(ranges), *_schema, columns_to_read, options);
|
||||
ps.set_partition_row_limit(max_rows);
|
||||
return make_lw_shared<query::read_command>(_schema->id(), _schema->version(), std::move(ps));
|
||||
}
|
||||
|
||||
@@ -60,7 +60,6 @@ public:
|
||||
static constexpr query::partition_slice::option_set options = query::partition_slice::option_set::of<
|
||||
query::partition_slice::option::send_partition_key,
|
||||
query::partition_slice::option::send_clustering_key,
|
||||
query::partition_slice::option::always_return_static_content,
|
||||
query::partition_slice::option::collections_as_maps>();
|
||||
|
||||
// Holder for data for
|
||||
|
||||
@@ -1984,7 +1984,8 @@ flat_mutation_reader make_multishard_streaming_reader(distributed<database>& db,
|
||||
return make_multishard_combining_reader(make_shared<streaming_reader_lifecycle_policy>(db), partitioner, std::move(s), pr, ps, pc,
|
||||
std::move(trace_state), fwd_mr);
|
||||
});
|
||||
return make_flat_multi_range_reader(std::move(schema), std::move(ms), std::move(range_generator), schema->full_slice(),
|
||||
auto&& full_slice = schema->full_slice();
|
||||
return make_flat_multi_range_reader(std::move(schema), std::move(ms), std::move(range_generator), std::move(full_slice),
|
||||
service::get_local_streaming_read_priority(), {}, mutation_reader::forwarding::no);
|
||||
}
|
||||
|
||||
|
||||
@@ -1241,6 +1241,34 @@ void db::commitlog::segment_manager::flush_segments(bool force) {
|
||||
}
|
||||
}
|
||||
|
||||
/// \brief Helper for ensuring a file is closed if an exception is thrown.
|
||||
///
|
||||
/// The file provided by the file_fut future is passed to func.
|
||||
/// * If func throws an exception E, the file is closed and we return
|
||||
/// a failed future with E.
|
||||
/// * If func returns a value V, the file is not closed and we return
|
||||
/// a future with V.
|
||||
/// Note that when an exception is not thrown, it is the
|
||||
/// responsibility of func to make sure the file will be closed. It
|
||||
/// can close the file itself, return it, or store it somewhere.
|
||||
///
|
||||
/// \tparam Func The type of function this wraps
|
||||
/// \param file_fut A future that produces a file
|
||||
/// \param func A function that uses a file
|
||||
/// \return A future that passes the file produced by file_fut to func
|
||||
/// and closes it if func fails
|
||||
template <typename Func>
|
||||
static auto close_on_failure(future<file> file_fut, Func func) {
|
||||
return file_fut.then([func = std::move(func)](file f) {
|
||||
return futurize_apply(func, f).handle_exception([f] (std::exception_ptr e) mutable {
|
||||
return f.close().then_wrapped([f, e = std::move(e)] (future<> x) {
|
||||
using futurator = futurize<std::result_of_t<Func(file)>>;
|
||||
return futurator::make_exception_future(e);
|
||||
});
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
future<db::commitlog::segment_manager::sseg_ptr> db::commitlog::segment_manager::allocate_segment_ex(const descriptor& d, sstring filename, open_flags flags, bool active) {
|
||||
file_open_options opt;
|
||||
opt.extent_allocation_size_hint = max_size;
|
||||
@@ -1258,7 +1286,7 @@ future<db::commitlog::segment_manager::sseg_ptr> db::commitlog::segment_manager:
|
||||
return fut;
|
||||
});
|
||||
|
||||
return fut.then([this, d, active, filename, flags](file f) {
|
||||
return close_on_failure(std::move(fut), [this, d, active, filename, flags] (file f) {
|
||||
f = make_checked_file(commit_error_handler, f);
|
||||
// xfs doesn't like files extended betond eof, so enlarge the file
|
||||
auto fut = make_ready_future<>();
|
||||
|
||||
@@ -276,7 +276,7 @@ future<> db::commitlog_replayer::impl::process(stats* s, commitlog::buffer_and_r
|
||||
}
|
||||
|
||||
auto shard = _db.local().shard_of(fm);
|
||||
return _db.invoke_on(shard, [this, cer = std::move(cer), &src_cm, rp, shard, s] (database& db) -> future<> {
|
||||
return _db.invoke_on(shard, [this, cer = std::move(cer), &src_cm, rp, shard, s] (database& db) mutable -> future<> {
|
||||
auto& fm = cer.mutation();
|
||||
// TODO: might need better verification that the deserialized mutation
|
||||
// is schema compatible. My guess is that just applying the mutation
|
||||
@@ -306,7 +306,9 @@ future<> db::commitlog_replayer::impl::process(stats* s, commitlog::buffer_and_r
|
||||
return db.apply_in_memory(m, cf, db::rp_handle(), db::no_timeout);
|
||||
});
|
||||
} else {
|
||||
return db.apply_in_memory(fm, cf.schema(), db::rp_handle(), db::no_timeout);
|
||||
return do_with(std::move(cer).mutation(), [&](const frozen_mutation& m) {
|
||||
return db.apply_in_memory(m, cf.schema(), db::rp_handle(), db::no_timeout);
|
||||
});
|
||||
}
|
||||
}).then_wrapped([s] (future<> f) {
|
||||
try {
|
||||
|
||||
39
db/config.cc
39
db/config.cc
@@ -22,6 +22,7 @@
|
||||
|
||||
#include <unordered_map>
|
||||
#include <regex>
|
||||
#include <sstream>
|
||||
|
||||
#include <boost/any.hpp>
|
||||
#include <boost/program_options.hpp>
|
||||
@@ -108,6 +109,10 @@ const config_type config_type_for<int32_t> = config_type("integer", value_to_jso
|
||||
template <>
|
||||
const config_type config_type_for<db::seed_provider_type> = config_type("seed provider", seed_provider_to_json);
|
||||
|
||||
template <>
|
||||
const config_type config_type_for<std::vector<enum_option<db::experimental_features_t>>> = config_type(
|
||||
"experimental features", value_to_json<std::vector<sstring>>);
|
||||
|
||||
}
|
||||
|
||||
namespace YAML {
|
||||
@@ -153,6 +158,23 @@ struct convert<db::config::seed_provider_type> {
|
||||
}
|
||||
};
|
||||
|
||||
template <>
|
||||
class convert<enum_option<db::experimental_features_t>> {
|
||||
public:
|
||||
static bool decode(const Node& node, enum_option<db::experimental_features_t>& rhs) {
|
||||
std::string name;
|
||||
if (!convert<std::string>::decode(node, name)) {
|
||||
return false;
|
||||
}
|
||||
try {
|
||||
std::istringstream(name) >> rhs;
|
||||
} catch (boost::program_options::invalid_option_value&) {
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
#if defined(DEBUG)
|
||||
@@ -669,7 +691,8 @@ db::config::config(std::shared_ptr<db::extensions> exts)
|
||||
, shutdown_announce_in_ms(this, "shutdown_announce_in_ms", value_status::Used, 2 * 1000, "Time a node waits after sending gossip shutdown message in milliseconds. Same as -Dcassandra.shutdown_announce_in_ms in cassandra.")
|
||||
, developer_mode(this, "developer_mode", value_status::Used, false, "Relax environment checks. Setting to true can reduce performance and reliability significantly.")
|
||||
, skip_wait_for_gossip_to_settle(this, "skip_wait_for_gossip_to_settle", value_status::Used, -1, "An integer to configure the wait for gossip to settle. -1: wait normally, 0: do not wait at all, n: wait for at most n polls. Same as -Dcassandra.skip_wait_for_gossip_to_settle in cassandra.")
|
||||
, experimental(this, "experimental", value_status::Used, false, "Set to true to unlock experimental features.")
|
||||
, experimental(this, "experimental", value_status::Used, false, "Set to true to unlock all experimental features.")
|
||||
, experimental_features(this, "experimental_features", value_status::Used, {}, "Unlock experimental features provided as the option arguments (possible values: 'lwt', 'cdc', 'udf'). Can be repeated.")
|
||||
, lsa_reclamation_step(this, "lsa_reclamation_step", value_status::Used, 1, "Minimum number of segments to reclaim in a single step")
|
||||
, prometheus_port(this, "prometheus_port", value_status::Used, 9180, "Prometheus port, set to zero to disable")
|
||||
, prometheus_address(this, "prometheus_address", value_status::Used, "0.0.0.0", "Prometheus listening address")
|
||||
@@ -779,10 +802,12 @@ db::fs::path db::config::get_conf_dir() {
|
||||
return confdir;
|
||||
}
|
||||
|
||||
void db::config::check_experimental(const sstring& what) const {
|
||||
if (!experimental()) {
|
||||
throw std::runtime_error(format("{} is currently disabled. Start Scylla with --experimental=on to enable.", what));
|
||||
bool db::config::check_experimental(experimental_features_t::feature f) const {
|
||||
if (experimental()) {
|
||||
return true;
|
||||
}
|
||||
const auto& optval = experimental_features();
|
||||
return find(begin(optval), end(optval), enum_option<experimental_features_t>{f}) != end(optval);
|
||||
}
|
||||
|
||||
namespace bpo = boost::program_options;
|
||||
@@ -827,6 +852,12 @@ const db::extensions& db::config::extensions() const {
|
||||
return *_extensions;
|
||||
}
|
||||
|
||||
std::unordered_map<sstring, db::experimental_features_t::feature> db::experimental_features_t::map() {
|
||||
// We decided against using the construct-on-first-use idiom here:
|
||||
// https://github.com/scylladb/scylla/pull/5369#discussion_r353614807
|
||||
return {{"lwt", LWT}, {"udf", UDF}, {"cdc", CDC}};
|
||||
}
|
||||
|
||||
template struct utils::config_file::named_value<seastar::log_level>;
|
||||
|
||||
namespace utils {
|
||||
|
||||
12
db/config.hh
12
db/config.hh
@@ -33,6 +33,7 @@
|
||||
|
||||
#include "seastarx.hh"
|
||||
#include "utils/config_file.hh"
|
||||
#include "utils/enum_option.hh"
|
||||
|
||||
namespace seastar { class file; struct logging_settings; }
|
||||
|
||||
@@ -75,14 +76,20 @@ sstring config_value_as_json(const std::unordered_map<sstring, log_level>& v);
|
||||
|
||||
namespace db {
|
||||
|
||||
/// Enumeration of all valid values for the `experimental` config entry.
|
||||
struct experimental_features_t {
|
||||
enum feature { LWT, UDF, CDC };
|
||||
static std::unordered_map<sstring, feature> map(); // See enum_option.
|
||||
};
|
||||
|
||||
class config : public utils::config_file {
|
||||
public:
|
||||
config();
|
||||
config(std::shared_ptr<db::extensions>);
|
||||
~config();
|
||||
|
||||
// Throws exception if experimental feature is disabled.
|
||||
void check_experimental(const sstring& what) const;
|
||||
/// True iff the feature is enabled.
|
||||
bool check_experimental(experimental_features_t::feature f) const;
|
||||
|
||||
/**
|
||||
* Scans the environment variables for configuration files directory
|
||||
@@ -263,6 +270,7 @@ public:
|
||||
named_value<bool> developer_mode;
|
||||
named_value<int32_t> skip_wait_for_gossip_to_settle;
|
||||
named_value<bool> experimental;
|
||||
named_value<std::vector<enum_option<experimental_features_t>>> experimental_features;
|
||||
named_value<size_t> lsa_reclamation_step;
|
||||
named_value<uint16_t> prometheus_port;
|
||||
named_value<sstring> prometheus_address;
|
||||
|
||||
@@ -33,12 +33,14 @@ enum class schema_feature {
|
||||
// See https://github.com/scylladb/scylla/issues/4485
|
||||
DIGEST_INSENSITIVE_TO_EXPIRY,
|
||||
COMPUTED_COLUMNS,
|
||||
CDC_OPTIONS,
|
||||
};
|
||||
|
||||
using schema_features = enum_set<super_enum<schema_feature,
|
||||
schema_feature::VIEW_VIRTUAL_COLUMNS,
|
||||
schema_feature::DIGEST_INSENSITIVE_TO_EXPIRY,
|
||||
schema_feature::COMPUTED_COLUMNS
|
||||
schema_feature::COMPUTED_COLUMNS,
|
||||
schema_feature::CDC_OPTIONS
|
||||
>>;
|
||||
|
||||
}
|
||||
|
||||
@@ -294,19 +294,24 @@ schema_ptr tables() {
|
||||
}
|
||||
|
||||
// Holds Scylla-specific table metadata.
|
||||
schema_ptr scylla_tables() {
|
||||
static thread_local auto schema = [] {
|
||||
schema_ptr scylla_tables(schema_features features) {
|
||||
static auto make = [] (bool has_cdc_options) -> schema_ptr {
|
||||
auto id = generate_legacy_id(NAME, SCYLLA_TABLES);
|
||||
return schema_builder(NAME, SCYLLA_TABLES, std::make_optional(id))
|
||||
auto sb = schema_builder(NAME, SCYLLA_TABLES, std::make_optional(id))
|
||||
.with_column("keyspace_name", utf8_type, column_kind::partition_key)
|
||||
.with_column("table_name", utf8_type, column_kind::clustering_key)
|
||||
.with_column("version", uuid_type)
|
||||
.with_column("cdc", map_type_impl::get_instance(utf8_type, utf8_type, false))
|
||||
.set_gc_grace_seconds(schema_gc_grace)
|
||||
.with_version(generate_schema_version(id))
|
||||
.build();
|
||||
}();
|
||||
return schema;
|
||||
.set_gc_grace_seconds(schema_gc_grace);
|
||||
if (has_cdc_options) {
|
||||
sb.with_column("cdc", map_type_impl::get_instance(utf8_type, utf8_type, false));
|
||||
sb.with_version(generate_schema_version(id, 1));
|
||||
} else {
|
||||
sb.with_version(generate_schema_version(id));
|
||||
}
|
||||
return sb.build();
|
||||
};
|
||||
static thread_local schema_ptr schemas[2] = { make(false), make(true) };
|
||||
return schemas[features.contains(schema_feature::CDC_OPTIONS)];
|
||||
}
|
||||
|
||||
// The "columns" table lists the definitions of all columns in all tables
|
||||
@@ -608,14 +613,28 @@ schema_ptr aggregates() {
|
||||
}
|
||||
#endif
|
||||
|
||||
static
|
||||
mutation
|
||||
redact_columns_for_missing_features(mutation m, schema_features features) {
|
||||
if (features.contains(schema_feature::CDC_OPTIONS)) {
|
||||
return std::move(m);
|
||||
}
|
||||
if (m.schema()->cf_name() != SCYLLA_TABLES) {
|
||||
return std::move(m);
|
||||
}
|
||||
slogger.debug("adjusting schema_tables mutation due to possible in-progress cluster upgrade");
|
||||
m.upgrade(scylla_tables(features));
|
||||
return std::move(m);
|
||||
}
|
||||
|
||||
/**
|
||||
* Read schema from system keyspace and calculate MD5 digest of every row, resulting digest
|
||||
* will be converted into UUID which would act as content-based version of the schema.
|
||||
*/
|
||||
future<utils::UUID> calculate_schema_digest(distributed<service::storage_proxy>& proxy, schema_features features)
|
||||
{
|
||||
auto map = [&proxy] (sstring table) {
|
||||
return db::system_keyspace::query_mutations(proxy, NAME, table).then([&proxy, table] (auto rs) {
|
||||
auto map = [&proxy, features] (sstring table) {
|
||||
return db::system_keyspace::query_mutations(proxy, NAME, table).then([&proxy, table, features] (auto rs) {
|
||||
auto s = proxy.local().get_db().local().find_schema(NAME, table);
|
||||
std::vector<mutation> mutations;
|
||||
for (auto&& p : rs->partitions()) {
|
||||
@@ -624,6 +643,7 @@ future<utils::UUID> calculate_schema_digest(distributed<service::storage_proxy>&
|
||||
if (is_system_keyspace(partition_key)) {
|
||||
continue;
|
||||
}
|
||||
mut = redact_columns_for_missing_features(std::move(mut), features);
|
||||
mutations.emplace_back(std::move(mut));
|
||||
}
|
||||
return mutations;
|
||||
@@ -647,8 +667,8 @@ future<utils::UUID> calculate_schema_digest(distributed<service::storage_proxy>&
|
||||
|
||||
future<std::vector<canonical_mutation>> convert_schema_to_mutations(distributed<service::storage_proxy>& proxy, schema_features features)
|
||||
{
|
||||
auto map = [&proxy] (sstring table) {
|
||||
return db::system_keyspace::query_mutations(proxy, NAME, table).then([&proxy, table] (auto rs) {
|
||||
auto map = [&proxy, features] (sstring table) {
|
||||
return db::system_keyspace::query_mutations(proxy, NAME, table).then([&proxy, table, features] (auto rs) {
|
||||
auto s = proxy.local().get_db().local().find_schema(NAME, table);
|
||||
std::vector<canonical_mutation> results;
|
||||
for (auto&& p : rs->partitions()) {
|
||||
@@ -657,6 +677,7 @@ future<std::vector<canonical_mutation>> convert_schema_to_mutations(distributed<
|
||||
if (is_system_keyspace(partition_key)) {
|
||||
continue;
|
||||
}
|
||||
mut = redact_columns_for_missing_features(std::move(mut), features);
|
||||
results.emplace_back(mut);
|
||||
}
|
||||
return results;
|
||||
@@ -669,6 +690,14 @@ future<std::vector<canonical_mutation>> convert_schema_to_mutations(distributed<
|
||||
return map_reduce(all_table_names(features), map, std::vector<canonical_mutation>{}, reduce);
|
||||
}
|
||||
|
||||
std::vector<mutation>
|
||||
adjust_schema_for_schema_features(std::vector<mutation> schema, schema_features features) {
|
||||
for (auto& m : schema) {
|
||||
m = redact_columns_for_missing_features(m, features);
|
||||
}
|
||||
return std::move(schema);
|
||||
}
|
||||
|
||||
future<schema_result>
|
||||
read_schema_for_keyspaces(distributed<service::storage_proxy>& proxy, const sstring& schema_table_name, const std::set<sstring>& keyspace_names)
|
||||
{
|
||||
@@ -1673,7 +1702,19 @@ mutation make_scylla_tables_mutation(schema_ptr table, api::timestamp_type times
|
||||
auto ckey = clustering_key::from_singular(*s, table->cf_name());
|
||||
mutation m(scylla_tables(), pkey);
|
||||
m.set_clustered_cell(ckey, "version", utils::UUID(table->version()), timestamp);
|
||||
store_map(m, ckey, "cdc", timestamp, table->cdc_options().to_map());
|
||||
auto cdc_options = table->cdc_options().to_map();
|
||||
if (!cdc_options.empty()) {
|
||||
store_map(m, ckey, "cdc", timestamp, cdc_options);
|
||||
} else {
|
||||
// Avoid storing anything for cdc disabled, so we don't end up with
|
||||
// different digests on different nodes due to the other node redacting
|
||||
// the cdc column when the cdc cluster feature is disabled.
|
||||
//
|
||||
// Tombstones are not considered for schema digest, so this is okay (and
|
||||
// needed in order for disabling of cdc to have effect).
|
||||
auto& cdc_cdef = *scylla_tables()->get_column_definition("cdc");
|
||||
m.set_clustered_cell(ckey, cdc_cdef, atomic_cell::make_dead(timestamp, gc_clock::now()));
|
||||
}
|
||||
return m;
|
||||
}
|
||||
|
||||
|
||||
@@ -109,7 +109,7 @@ schema_ptr view_virtual_columns();
|
||||
schema_ptr dropped_columns();
|
||||
schema_ptr indexes();
|
||||
schema_ptr tables();
|
||||
schema_ptr scylla_tables();
|
||||
schema_ptr scylla_tables(schema_features features = schema_features::full());
|
||||
schema_ptr views();
|
||||
schema_ptr computed_columns();
|
||||
|
||||
@@ -154,6 +154,7 @@ future<> save_system_keyspace_schema();
|
||||
future<utils::UUID> calculate_schema_digest(distributed<service::storage_proxy>& proxy, schema_features);
|
||||
|
||||
future<std::vector<canonical_mutation>> convert_schema_to_mutations(distributed<service::storage_proxy>& proxy, schema_features);
|
||||
std::vector<mutation> adjust_schema_for_schema_features(std::vector<mutation> schema, schema_features features);
|
||||
|
||||
future<schema_result_value_type>
|
||||
read_schema_partition_for_keyspace(distributed<service::storage_proxy>& proxy, const sstring& schema_table_name, const sstring& keyspace_name);
|
||||
|
||||
@@ -104,10 +104,10 @@ api::timestamp_type schema_creation_timestamp() {
|
||||
// FIXME: Make automatic by calculating from schema structure.
|
||||
static const uint16_t version_sequence_number = 1;
|
||||
|
||||
table_schema_version generate_schema_version(utils::UUID table_id) {
|
||||
table_schema_version generate_schema_version(utils::UUID table_id, uint16_t offset) {
|
||||
md5_hasher h;
|
||||
feed_hash(h, table_id);
|
||||
feed_hash(h, version_sequence_number);
|
||||
feed_hash(h, version_sequence_number + offset);
|
||||
return utils::UUID_gen::get_name_UUID(h.finalize());
|
||||
}
|
||||
|
||||
|
||||
@@ -152,7 +152,7 @@ schema_ptr aggregates();
|
||||
|
||||
}
|
||||
|
||||
table_schema_version generate_schema_version(utils::UUID table_id);
|
||||
table_schema_version generate_schema_version(utils::UUID table_id, uint16_t offset = 0);
|
||||
|
||||
// Only for testing.
|
||||
void minimal_setup(distributed<database>& db, distributed<cql3::query_processor>& qp);
|
||||
|
||||
@@ -307,7 +307,7 @@ deletable_row& view_updates::get_view_row(const partition_key& base_key, const c
|
||||
if (!cdef.is_computed()) {
|
||||
//FIXME(sarna): this legacy code is here for backward compatibility and should be removed
|
||||
// once "computed_columns feature" is supported by every node
|
||||
if (!service::get_local_storage_service().db().local().find_column_family(_base->id()).get_index_manager().is_index(*_base)) {
|
||||
if (!service::get_local_storage_service().db().local().find_column_family(_base->id()).get_index_manager().is_index(*_view)) {
|
||||
throw std::logic_error(format("Column {} doesn't exist in base and this view is not backing a secondary index", cdef.name_as_text()));
|
||||
}
|
||||
computed_value = token_column_computation().compute_value(*_base, base_key, update);
|
||||
|
||||
3
dist/common/scripts/scylla_prepare
vendored
3
dist/common/scripts/scylla_prepare
vendored
@@ -63,7 +63,7 @@ if __name__ == '__main__':
|
||||
run('ip link set dev {TAP} master {BRIDGE}'.format(TAP=tap, BRIDGE=bridge))
|
||||
run('chown {USER}.{GROUP} /dev/vhost-net'.format(USER=user, GROUP=group))
|
||||
elif mode == 'dpdk':
|
||||
ethpcciid = cfg.get('ETHPCIID')
|
||||
ethpciid = cfg.get('ETHPCIID')
|
||||
nr_hugepages = cfg.get('NR_HUGEPAGES')
|
||||
run('modprobe uio')
|
||||
run('modprobe uio_pci_generic')
|
||||
@@ -73,7 +73,6 @@ if __name__ == '__main__':
|
||||
f.write(nr_hugepages)
|
||||
if dist_name() == 'Ubuntu':
|
||||
run('hugeadm --create-mounts')
|
||||
fi
|
||||
else:
|
||||
set_nic_and_disks = get_set_nic_and_disks_config_value(cfg)
|
||||
ifname = cfg.get('IFNAME')
|
||||
|
||||
1
dist/debian/debian/scylla-server.install
vendored
1
dist/debian/debian/scylla-server.install
vendored
@@ -21,6 +21,7 @@ opt/scylladb/scripts/libexec/*
|
||||
opt/scylladb/bin/*
|
||||
opt/scylladb/libreloc/*
|
||||
opt/scylladb/libexec/*
|
||||
usr/lib/scylla/*
|
||||
var/lib/scylla/data
|
||||
var/lib/scylla/commitlog
|
||||
var/lib/scylla/hints
|
||||
|
||||
4
dist/debian/debian/scylla-server.postinst
vendored
4
dist/debian/debian/scylla-server.postinst
vendored
@@ -24,10 +24,6 @@ if [ "$1" = configure ]; then
|
||||
fi
|
||||
|
||||
ln -sfT /etc/scylla /var/lib/scylla/conf
|
||||
if [ -d /usr/lib/scylla ]; then
|
||||
mv /usr/lib/scylla /usr/lib/scylla.old
|
||||
fi
|
||||
ln -sfT /opt/scylladb/scripts /usr/lib/scylla
|
||||
|
||||
grep -v api_ui_dir /etc/scylla/scylla.yaml | grep -v api_doc_dir > /tmp/scylla.yaml
|
||||
echo "api_ui_dir: /opt/scylladb/swagger-ui/dist/" >> /tmp/scylla.yaml
|
||||
|
||||
2
dist/docker/redhat/Dockerfile
vendored
2
dist/docker/redhat/Dockerfile
vendored
@@ -5,7 +5,7 @@ MAINTAINER Avi Kivity <avi@cloudius-systems.com>
|
||||
ENV container docker
|
||||
|
||||
# The SCYLLA_REPO_URL argument specifies the URL to the RPM repository this Docker image uses to install Scylla. The default value is the Scylla's unstable RPM repository, which contains the daily build.
|
||||
ARG SCYLLA_REPO_URL=http://downloads.scylladb.com/rpm/unstable/centos/master/latest/scylla.repo
|
||||
ARG SCYLLA_REPO_URL=http://downloads.scylladb.com/rpm/unstable/centos/branch-3.2/latest/scylla.repo
|
||||
|
||||
ADD scylla_bashrc /scylla_bashrc
|
||||
|
||||
|
||||
2
dist/docker/redhat/scylla-jmx-service.sh
vendored
2
dist/docker/redhat/scylla-jmx-service.sh
vendored
@@ -2,4 +2,4 @@
|
||||
|
||||
source /etc/sysconfig/scylla-jmx
|
||||
|
||||
exec /opt/scylladb/scripts/jmx/scylla-jmx -l /opt/scylladb/scripts/jmx
|
||||
exec /opt/scylladb/jmx/scylla-jmx -l /opt/scylladb/jmx
|
||||
|
||||
10
dist/redhat/scylla.spec.mustache
vendored
10
dist/redhat/scylla.spec.mustache
vendored
@@ -15,6 +15,8 @@ Obsoletes: scylla-server < 1.1
|
||||
%global __brp_python_bytecompile %{nil}
|
||||
%global __brp_mangle_shebangs %{nil}
|
||||
|
||||
%undefine _find_debuginfo_dwz_opts
|
||||
|
||||
%description
|
||||
Scylla is a highly scalable, eventually consistent, distributed,
|
||||
partitioned row DB.
|
||||
@@ -75,9 +77,6 @@ getent passwd scylla || /usr/sbin/useradd -g scylla -s /sbin/nologin -r -d %{_sh
|
||||
if [ -f /etc/systemd/coredump.conf ];then
|
||||
/opt/scylladb/scripts/scylla_coredump_setup
|
||||
fi
|
||||
if [ -d /usr/lib/scylla ]; then
|
||||
mv /usr/lib/scylla /usr/lib/scylla.old
|
||||
fi
|
||||
|
||||
/opt/scylladb/scripts/scylla_post_install.sh
|
||||
|
||||
@@ -95,10 +94,6 @@ if [ -d /tmp/%{name}-%{version}-%{release} ]; then
|
||||
rm -rf /tmp/%{name}-%{version}-%{release}/
|
||||
fi
|
||||
ln -sfT /etc/scylla /var/lib/scylla/conf
|
||||
if [ -d /usr/lib/scylla ]; then
|
||||
mv /usr/lib/scylla /usr/lib/scylla.old
|
||||
fi
|
||||
ln -sfT /opt/scylladb/scripts /usr/lib/scylla
|
||||
|
||||
%clean
|
||||
rm -rf $RPM_BUILD_ROOT
|
||||
@@ -130,6 +125,7 @@ rm -rf $RPM_BUILD_ROOT
|
||||
/opt/scylladb/bin/*
|
||||
/opt/scylladb/libreloc/*
|
||||
/opt/scylladb/libexec/*
|
||||
%{_prefix}/lib/scylla/*
|
||||
%attr(0755,scylla,scylla) %dir %{_sharedstatedir}/scylla/
|
||||
%attr(0755,scylla,scylla) %dir %{_sharedstatedir}/scylla/data
|
||||
%attr(0755,scylla,scylla) %dir %{_sharedstatedir}/scylla/commitlog
|
||||
|
||||
@@ -69,11 +69,6 @@ struct get_sync_boundary_response {
|
||||
uint64_t new_rows_nr;
|
||||
};
|
||||
|
||||
struct get_combined_row_hash_response {
|
||||
repair_hash working_row_buf_combined_csum;
|
||||
uint64_t working_row_buf_nr;
|
||||
};
|
||||
|
||||
enum class row_level_diff_detect_algorithm : uint8_t {
|
||||
send_full_set,
|
||||
send_full_set_rpc_stream,
|
||||
|
||||
14
install.sh
14
install.sh
@@ -219,6 +219,20 @@ EOS
|
||||
for i in $SBINFILES; do
|
||||
ln -srf "$rprefix/scripts/$i" "$rusr/sbin/$i"
|
||||
done
|
||||
|
||||
# we need keep /usr/lib/scylla directory to support upgrade/downgrade
|
||||
# without error, so we need to create symlink for each script on the
|
||||
# directory
|
||||
install -m755 -d "$rusr"/lib/scylla/scyllatop/views
|
||||
for i in $(find "$rprefix"/scripts/ -maxdepth 1 -type f); do
|
||||
ln -srf $i "$rusr"/lib/scylla/
|
||||
done
|
||||
for i in $(find "$rprefix"/scyllatop/ -maxdepth 1 -type f); do
|
||||
ln -srf $i "$rusr"/lib/scylla/scyllatop
|
||||
done
|
||||
for i in $(find "$rprefix"/scyllatop/views -maxdepth 1 -type f); do
|
||||
ln -srf $i "$rusr"/lib/scylla/scyllatop/views
|
||||
done
|
||||
else
|
||||
install -m755 -d "$rdata"/saved_caches
|
||||
install -d -m755 "$retc"/systemd/system/scylla-server.service.d
|
||||
|
||||
@@ -1876,7 +1876,7 @@ bool row_marker::compact_and_expire(tombstone tomb, gc_clock::time_point now,
|
||||
_timestamp = api::missing_timestamp;
|
||||
return false;
|
||||
}
|
||||
if (_ttl > no_ttl && _expiry < now) {
|
||||
if (_ttl > no_ttl && _expiry <= now) {
|
||||
_expiry -= _ttl;
|
||||
_ttl = dead;
|
||||
}
|
||||
|
||||
@@ -679,7 +679,7 @@ public:
|
||||
if (is_missing() || _ttl == dead) {
|
||||
return false;
|
||||
}
|
||||
if (_ttl != no_ttl && _expiry < now) {
|
||||
if (_ttl != no_ttl && _expiry <= now) {
|
||||
return false;
|
||||
}
|
||||
return _timestamp > t.timestamp;
|
||||
@@ -689,7 +689,7 @@ public:
|
||||
if (_ttl == dead) {
|
||||
return true;
|
||||
}
|
||||
return _ttl != no_ttl && _expiry < now;
|
||||
return _ttl != no_ttl && _expiry <= now;
|
||||
}
|
||||
// Can be called only when is_live().
|
||||
bool is_expiring() const {
|
||||
|
||||
@@ -321,11 +321,7 @@ struct get_sync_boundary_response {
|
||||
};
|
||||
|
||||
// Return value of the REPAIR_GET_COMBINED_ROW_HASH RPC verb
|
||||
struct get_combined_row_hash_response {
|
||||
repair_hash working_row_buf_combined_csum;
|
||||
// The number of rows in the working row buf
|
||||
uint64_t working_row_buf_nr;
|
||||
};
|
||||
using get_combined_row_hash_response = repair_hash;
|
||||
|
||||
struct node_repair_meta_id {
|
||||
gms::inet_address ip;
|
||||
|
||||
@@ -448,6 +448,10 @@ class repair_writer {
|
||||
std::vector<std::optional<seastar::queue<mutation_fragment_opt>>> _mq;
|
||||
// Current partition written to disk
|
||||
std::vector<lw_shared_ptr<const decorated_key_with_hash>> _current_dk_written_to_sstable;
|
||||
// Is current partition still open. A partition is opened when a
|
||||
// partition_start is written and is closed when a partition_end is
|
||||
// written.
|
||||
std::vector<bool> _partition_opened;
|
||||
public:
|
||||
repair_writer(
|
||||
schema_ptr schema,
|
||||
@@ -462,10 +466,13 @@ public:
|
||||
future<> write_start_and_mf(lw_shared_ptr<const decorated_key_with_hash> dk, mutation_fragment mf, unsigned node_idx) {
|
||||
_current_dk_written_to_sstable[node_idx] = dk;
|
||||
if (mf.is_partition_start()) {
|
||||
return _mq[node_idx]->push_eventually(mutation_fragment_opt(std::move(mf)));
|
||||
return _mq[node_idx]->push_eventually(mutation_fragment_opt(std::move(mf))).then([this, node_idx] {
|
||||
_partition_opened[node_idx] = true;
|
||||
});
|
||||
} else {
|
||||
auto start = mutation_fragment(partition_start(dk->dk, tombstone()));
|
||||
return _mq[node_idx]->push_eventually(mutation_fragment_opt(std::move(start))).then([this, node_idx, mf = std::move(mf)] () mutable {
|
||||
_partition_opened[node_idx] = true;
|
||||
return _mq[node_idx]->push_eventually(mutation_fragment_opt(std::move(mf)));
|
||||
});
|
||||
}
|
||||
@@ -475,6 +482,7 @@ public:
|
||||
_writer_done.resize(_nr_peer_nodes);
|
||||
_mq.resize(_nr_peer_nodes);
|
||||
_current_dk_written_to_sstable.resize(_nr_peer_nodes);
|
||||
_partition_opened.resize(_nr_peer_nodes, false);
|
||||
}
|
||||
|
||||
void create_writer(unsigned node_idx) {
|
||||
@@ -519,12 +527,21 @@ public:
|
||||
t.stream_in_progress());
|
||||
}
|
||||
|
||||
future<> write_partition_end(unsigned node_idx) {
|
||||
if (_partition_opened[node_idx]) {
|
||||
return _mq[node_idx]->push_eventually(mutation_fragment(partition_end())).then([this, node_idx] {
|
||||
_partition_opened[node_idx] = false;
|
||||
});
|
||||
}
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
future<> do_write(unsigned node_idx, lw_shared_ptr<const decorated_key_with_hash> dk, mutation_fragment mf) {
|
||||
if (_current_dk_written_to_sstable[node_idx]) {
|
||||
if (_current_dk_written_to_sstable[node_idx]->dk.equal(*_schema, dk->dk)) {
|
||||
return _mq[node_idx]->push_eventually(mutation_fragment_opt(std::move(mf)));
|
||||
} else {
|
||||
return _mq[node_idx]->push_eventually(mutation_fragment(partition_end())).then([this,
|
||||
return write_partition_end(node_idx).then([this,
|
||||
node_idx, dk = std::move(dk), mf = std::move(mf)] () mutable {
|
||||
return write_start_and_mf(std::move(dk), std::move(mf), node_idx);
|
||||
});
|
||||
@@ -538,7 +555,7 @@ public:
|
||||
return parallel_for_each(boost::irange(unsigned(0), unsigned(_nr_peer_nodes)), [this] (unsigned node_idx) {
|
||||
if (_writer_done[node_idx] && _mq[node_idx]) {
|
||||
// Partition_end is never sent on wire, so we have to write one ourselves.
|
||||
return _mq[node_idx]->push_eventually(mutation_fragment(partition_end())).then([this, node_idx] () mutable {
|
||||
return write_partition_end(node_idx).then([this, node_idx] () mutable {
|
||||
// Empty mutation_fragment_opt means no more data, so the writer can seal the sstables.
|
||||
return _mq[node_idx]->push_eventually(mutation_fragment_opt()).then([this, node_idx] () mutable {
|
||||
return (*_writer_done[node_idx]).then([] (uint64_t partitions) {
|
||||
@@ -1098,14 +1115,14 @@ private:
|
||||
_working_row_buf_combined_hash.clear();
|
||||
|
||||
if (_row_buf.empty()) {
|
||||
return make_ready_future<get_combined_row_hash_response>(get_combined_row_hash_response{repair_hash(), 0});
|
||||
return make_ready_future<get_combined_row_hash_response>(get_combined_row_hash_response());
|
||||
}
|
||||
return move_row_buf_to_working_row_buf().then([this] {
|
||||
return do_for_each(_working_row_buf, [this] (repair_row& r) {
|
||||
_working_row_buf_combined_hash.add(r.hash());
|
||||
return make_ready_future<>();
|
||||
}).then([this] {
|
||||
return get_combined_row_hash_response{_working_row_buf_combined_hash, _working_row_buf.size()};
|
||||
return get_combined_row_hash_response{_working_row_buf_combined_hash};
|
||||
});
|
||||
});
|
||||
}
|
||||
@@ -1352,7 +1369,9 @@ public:
|
||||
auto source_op = get_full_row_hashes_source_op(current_hashes, remote_node, node_idx, source);
|
||||
auto sink_op = get_full_row_hashes_sink_op(sink);
|
||||
return when_all_succeed(std::move(source_op), std::move(sink_op));
|
||||
}).then([current_hashes] () mutable {
|
||||
}).then([this, current_hashes] () mutable {
|
||||
stats().rx_hashes_nr += current_hashes->size();
|
||||
_metrics.rx_hashes_nr += current_hashes->size();
|
||||
return std::move(*current_hashes);
|
||||
});
|
||||
}
|
||||
@@ -1763,6 +1782,7 @@ static future<stop_iteration> repair_get_row_diff_with_rpc_stream_process_op(
|
||||
return make_exception_future<stop_iteration>(std::runtime_error("get_row_diff_with_rpc_stream: Inject error in handler loop"));
|
||||
}
|
||||
bool needs_all_rows = hash_cmd.cmd == repair_stream_cmd::needs_all_rows;
|
||||
_metrics.rx_hashes_nr += current_set_diff.size();
|
||||
auto fp = make_foreign(std::make_unique<std::unordered_set<repair_hash>>(std::move(current_set_diff)));
|
||||
return smp::submit_to(src_cpu_id % smp::count, [from, repair_meta_id, needs_all_rows, fp = std::move(fp)] {
|
||||
auto rm = repair_meta::get_repair_meta(from, repair_meta_id);
|
||||
@@ -2067,6 +2087,7 @@ future<> repair_init_messaging_service_handler(repair_service& rs, distributed<d
|
||||
std::unordered_set<repair_hash> set_diff, bool needs_all_rows) {
|
||||
auto src_cpu_id = cinfo.retrieve_auxiliary<uint32_t>("src_cpu_id");
|
||||
auto from = cinfo.retrieve_auxiliary<gms::inet_address>("baddr");
|
||||
_metrics.rx_hashes_nr += set_diff.size();
|
||||
auto fp = make_foreign(std::make_unique<std::unordered_set<repair_hash>>(std::move(set_diff)));
|
||||
return smp::submit_to(src_cpu_id % smp::count, [from, repair_meta_id, fp = std::move(fp), needs_all_rows] () mutable {
|
||||
auto rm = repair_meta::get_repair_meta(from, repair_meta_id);
|
||||
@@ -2292,8 +2313,8 @@ private:
|
||||
// are identical, there is no need to transfer each and every
|
||||
// row hashes to the repair master.
|
||||
return master.get_combined_row_hash(_common_sync_boundary, _all_nodes[idx]).then([&, this, idx] (get_combined_row_hash_response resp) {
|
||||
rlogger.debug("Calling master.get_combined_row_hash for node {}, got combined_hash={}, rows_nr={}", _all_nodes[idx], resp.working_row_buf_combined_csum, resp.working_row_buf_nr);
|
||||
combined_hashes[idx]= std::move(resp.working_row_buf_combined_csum);
|
||||
rlogger.debug("Calling master.get_combined_row_hash for node {}, got combined_hash={}", _all_nodes[idx], resp);
|
||||
combined_hashes[idx]= std::move(resp);
|
||||
});
|
||||
}).get();
|
||||
|
||||
|
||||
@@ -931,7 +931,6 @@ future<> row_cache::do_update(external_updater eu, memtable& m, Updater updater)
|
||||
});
|
||||
|
||||
return seastar::async([this, &m, updater = std::move(updater), real_dirty_acc = std::move(real_dirty_acc)] () mutable {
|
||||
coroutine update;
|
||||
size_t size_entry;
|
||||
// In case updater fails, we must bring the cache to consistency without deferring.
|
||||
auto cleanup = defer([&m, this] {
|
||||
@@ -939,6 +938,7 @@ future<> row_cache::do_update(external_updater eu, memtable& m, Updater updater)
|
||||
_prev_snapshot_pos = {};
|
||||
_prev_snapshot = {};
|
||||
});
|
||||
coroutine update; // Destroy before cleanup to release snapshots before invalidating.
|
||||
partition_presence_checker is_present = _prev_snapshot->make_partition_presence_checker();
|
||||
while (!m.partitions.empty()) {
|
||||
with_allocator(_tracker.allocator(), [&] () {
|
||||
|
||||
@@ -109,7 +109,10 @@ std::optional<std::map<sstring, sstring>> schema_mutations::cdc_options() const
|
||||
if (_scylla_tables) {
|
||||
auto rs = query::result_set(*_scylla_tables);
|
||||
if (!rs.empty()) {
|
||||
return db::schema_tables::get_map<sstring, sstring>(rs.row(0), "cdc");
|
||||
auto map = db::schema_tables::get_map<sstring, sstring>(rs.row(0), "cdc");
|
||||
if (map && !map->empty()) {
|
||||
return map;
|
||||
}
|
||||
}
|
||||
}
|
||||
return { };
|
||||
|
||||
2
seastar
2
seastar
Submodule seastar updated: 6f0ef32514...acd63c4791
@@ -93,6 +93,7 @@ void migration_manager::init_messaging_service()
|
||||
|
||||
_feature_listeners.push_back(ss.cluster_supports_view_virtual_columns().when_enabled(update_schema));
|
||||
_feature_listeners.push_back(ss.cluster_supports_digest_insensitive_to_expiry().when_enabled(update_schema));
|
||||
_feature_listeners.push_back(ss.cluster_supports_cdc().when_enabled(update_schema));
|
||||
|
||||
auto& ms = netw::get_local_messaging_service();
|
||||
ms.register_definitions_update([this] (const rpc::client_info& cinfo, std::vector<frozen_mutation> fm, rpc::optional<std::vector<canonical_mutation>> cm) {
|
||||
@@ -311,7 +312,8 @@ future<> migration_manager::merge_schema_from(netw::messaging_service::msg_addr
|
||||
try {
|
||||
for (const auto& cm : canonical_mutations) {
|
||||
auto& tbl = db.find_column_family(cm.column_family_id());
|
||||
mutations.emplace_back(cm.to_mutation(tbl.schema()));
|
||||
mutations.emplace_back(cm.to_mutation(
|
||||
tbl.schema()));
|
||||
}
|
||||
} catch (no_such_column_family& e) {
|
||||
mlogger.error("Error while applying schema mutations from {}: {}", src, e);
|
||||
@@ -902,8 +904,9 @@ future<> migration_manager::announce(std::vector<mutation> mutations, bool annou
|
||||
future<> migration_manager::push_schema_mutation(const gms::inet_address& endpoint, const std::vector<mutation>& schema)
|
||||
{
|
||||
netw::messaging_service::msg_addr id{endpoint, 0};
|
||||
auto fm = std::vector<frozen_mutation>(schema.begin(), schema.end());
|
||||
auto cm = std::vector<canonical_mutation>(schema.begin(), schema.end());
|
||||
auto adjusted_schema = db::schema_tables::adjust_schema_for_schema_features(schema, get_local_storage_service().cluster_schema_features());
|
||||
auto fm = std::vector<frozen_mutation>(adjusted_schema.begin(), adjusted_schema.end());
|
||||
auto cm = std::vector<canonical_mutation>(adjusted_schema.begin(), adjusted_schema.end());
|
||||
return netw::get_local_messaging_service().send_definitions_update(id, std::move(fm), std::move(cm));
|
||||
}
|
||||
|
||||
|
||||
@@ -706,7 +706,9 @@ static future<std::optional<utils::UUID>> sleep_and_restart() {
|
||||
* nodes have seen the most recent commit. Otherwise, return null.
|
||||
*/
|
||||
future<utils::UUID> paxos_response_handler::begin_and_repair_paxos(client_state& cs, unsigned& contentions, bool is_write) {
|
||||
_proxy->get_db().local().get_config().check_experimental("Paxos");
|
||||
if (!_proxy->get_db().local().get_config().check_experimental(db::experimental_features_t::LWT)) {
|
||||
throw std::runtime_error("Paxos is currently disabled. Start Scylla with --experimental-features=lwt to enable.");
|
||||
}
|
||||
return do_with(api::timestamp_type(0), shared_from_this(), [this, &cs, &contentions, is_write]
|
||||
(api::timestamp_type& min_timestamp_micros_to_use, shared_ptr<paxos_response_handler>& prh) {
|
||||
return repeat_until_value([this, &contentions, &cs, &min_timestamp_micros_to_use, is_write] {
|
||||
@@ -1883,8 +1885,9 @@ storage_proxy::get_paxos_participants(const sstring& ks_name, const dht::token &
|
||||
});
|
||||
pending_endpoints.erase(itend, pending_endpoints.end());
|
||||
|
||||
size_t participants = pending_endpoints.size() + natural_endpoints.size();
|
||||
size_t required_participants = db::quorum_for(ks) + pending_endpoints.size();
|
||||
const size_t participants = pending_endpoints.size() + natural_endpoints.size();
|
||||
const size_t quorum_size = natural_endpoints.size() / 2 + 1;
|
||||
const size_t required_participants = quorum_size + pending_endpoints.size();
|
||||
|
||||
std::vector<gms::inet_address> live_endpoints;
|
||||
live_endpoints.reserve(participants);
|
||||
|
||||
@@ -344,12 +344,11 @@ std::set<sstring> storage_service::get_config_supported_features_set() {
|
||||
// This should only be true in tests (see cql_test_env.cc:storage_service_for_tests)
|
||||
auto& db = service::get_local_storage_service().db();
|
||||
if (db.local_is_initialized()) {
|
||||
auto& config = service::get_local_storage_service().db().local().get_config();
|
||||
auto& config = db.local().get_config();
|
||||
if (config.enable_sstables_mc_format()) {
|
||||
features.insert(MC_SSTABLE_FEATURE);
|
||||
}
|
||||
if (config.experimental()) {
|
||||
// push additional experimental features
|
||||
if (config.check_experimental(db::experimental_features_t::CDC)) {
|
||||
features.insert(CDC_FEATURE);
|
||||
}
|
||||
}
|
||||
@@ -3533,6 +3532,7 @@ db::schema_features storage_service::cluster_schema_features() const {
|
||||
f.set_if<db::schema_feature::VIEW_VIRTUAL_COLUMNS>(bool(_view_virtual_columns));
|
||||
f.set_if<db::schema_feature::DIGEST_INSENSITIVE_TO_EXPIRY>(bool(_digest_insensitive_to_expiry));
|
||||
f.set_if<db::schema_feature::COMPUTED_COLUMNS>(bool(_computed_columns));
|
||||
f.set_if<db::schema_feature::CDC_OPTIONS>(bool(_cdc_feature));
|
||||
return f;
|
||||
}
|
||||
|
||||
|
||||
@@ -2341,8 +2341,8 @@ public:
|
||||
return bool(_mc_sstable_feature);
|
||||
}
|
||||
|
||||
bool cluster_supports_cdc() const {
|
||||
return bool(_cdc_feature);
|
||||
const gms::feature& cluster_supports_cdc() const {
|
||||
return _cdc_feature;
|
||||
}
|
||||
|
||||
bool cluster_supports_row_level_repair() const {
|
||||
|
||||
@@ -2699,7 +2699,7 @@ entry_descriptor entry_descriptor::make_descriptor(sstring sstdir, sstring fname
|
||||
static std::regex la_mc("(la|mc)-(\\d+)-(\\w+)-(.*)");
|
||||
static std::regex ka("(\\w+)-(\\w+)-ka-(\\d+)-(.*)");
|
||||
|
||||
static std::regex dir(".*/([^/]*)/(\\w+)-[\\da-fA-F]+(?:/staging|/upload|/snapshots/[^/]+)?/?");
|
||||
static std::regex dir(".*/([^/]*)/([^/]+)-[\\da-fA-F]+(?:/staging|/upload|/snapshots/[^/]+)?/?");
|
||||
|
||||
std::smatch match;
|
||||
|
||||
|
||||
8
table.cc
8
table.cc
@@ -292,7 +292,7 @@ create_single_key_sstable_reader(column_family* cf,
|
||||
filter_sstable_for_reader(sstables->select(pr), *cf, schema, pr, key, slice)
|
||||
| boost::adaptors::transformed([&] (const sstables::shared_sstable& sstable) {
|
||||
tracing::trace(trace_state, "Reading key {} from sstable {}", pr, seastar::value_of([&sstable] { return sstable->get_filename(); }));
|
||||
return sstable->read_row_flat(schema, pr.start()->value(), slice, pc, resource_tracker, std::move(trace_state), fwd);
|
||||
return sstable->read_row_flat(schema, pr.start()->value(), slice, pc, resource_tracker, trace_state, fwd);
|
||||
})
|
||||
);
|
||||
if (readers.empty()) {
|
||||
@@ -315,7 +315,7 @@ flat_mutation_reader make_range_sstable_reader(schema_ptr s,
|
||||
{
|
||||
auto reader_factory_fn = [s, &slice, &pc, resource_tracker, trace_state, fwd, fwd_mr, &monitor_generator]
|
||||
(sstables::shared_sstable& sst, const dht::partition_range& pr) mutable {
|
||||
return sst->read_range_rows_flat(s, pr, slice, pc, resource_tracker, std::move(trace_state), fwd, fwd_mr, monitor_generator(sst));
|
||||
return sst->read_range_rows_flat(s, pr, slice, pc, resource_tracker, trace_state, fwd, fwd_mr, monitor_generator(sst));
|
||||
};
|
||||
return make_combined_reader(s, std::make_unique<incremental_reader_selector>(s,
|
||||
std::move(sstables),
|
||||
@@ -587,7 +587,7 @@ flat_mutation_reader make_local_shard_sstable_reader(schema_ptr s,
|
||||
auto reader_factory_fn = [s, &slice, &pc, resource_tracker, trace_state, fwd, fwd_mr, &monitor_generator]
|
||||
(sstables::shared_sstable& sst, const dht::partition_range& pr) mutable {
|
||||
flat_mutation_reader reader = sst->read_range_rows_flat(s, pr, slice, pc,
|
||||
resource_tracker, std::move(trace_state), fwd, fwd_mr, monitor_generator(sst));
|
||||
resource_tracker, trace_state, fwd, fwd_mr, monitor_generator(sst));
|
||||
if (sst->is_shared()) {
|
||||
using sig = bool (&)(const dht::decorated_key&);
|
||||
reader = make_filtering_reader(std::move(reader), sig(belongs_to_current_shard));
|
||||
@@ -2543,7 +2543,7 @@ future<row_locker::lock_holder> table::do_push_view_replica_updates(const schema
|
||||
std::move(slice),
|
||||
std::move(m),
|
||||
[base, views = std::move(views), lock = std::move(lock), this, timeout, source = std::move(source), &io_priority] (auto& pk, auto& slice, auto& m) mutable {
|
||||
auto reader = source.make_reader(base, pk, slice, io_priority);
|
||||
auto reader = source.make_reader(base, pk, slice, io_priority, nullptr, streamed_mutation::forwarding::no, mutation_reader::forwarding::no);
|
||||
return this->generate_and_propagate_view_updates(base, std::move(views), std::move(m), std::move(reader)).then([lock = std::move(lock)] () mutable {
|
||||
// return the local partition/row lock we have taken so it
|
||||
// remains locked until the caller is done modifying this
|
||||
|
||||
9
test.py
9
test.py
@@ -131,6 +131,7 @@ boost_tests = [
|
||||
'data_listeners_test',
|
||||
'truncation_migration_test',
|
||||
'like_matcher_test',
|
||||
'enum_option_test',
|
||||
]
|
||||
|
||||
other_tests = [
|
||||
@@ -265,7 +266,7 @@ if __name__ == "__main__":
|
||||
env['UBSAN_OPTIONS'] = 'print_stacktrace=1'
|
||||
env['BOOST_TEST_CATCH_SYSTEM_ERRORS'] = 'no'
|
||||
|
||||
def run_test(path, type, exec_args):
|
||||
def run_test(path, repeat, type, exec_args):
|
||||
boost_args = []
|
||||
# avoid modifying in-place, it will change test_to_run
|
||||
exec_args = exec_args + '--collectd 0'.split()
|
||||
@@ -274,7 +275,7 @@ if __name__ == "__main__":
|
||||
mode = 'release'
|
||||
if path.startswith(os.path.join('build', 'debug')):
|
||||
mode = 'debug'
|
||||
xmlout = (args.jenkins + "." + mode + "." + os.path.basename(path.split()[0]) + ".boost.xml")
|
||||
xmlout = (args.jenkins + "." + mode + "." + os.path.basename(path.split()[0]) + "." + str(repeat) + ".boost.xml")
|
||||
boost_args += ['--report_level=no', '--logger=HRF,test_suite:XML,test_suite,' + xmlout]
|
||||
if type == 'boost':
|
||||
boost_args += ['--']
|
||||
@@ -312,8 +313,8 @@ if __name__ == "__main__":
|
||||
path = test[0]
|
||||
test_type = test[1]
|
||||
exec_args = test[2] if len(test) >= 3 else []
|
||||
for _ in range(args.repeat):
|
||||
futures.append(executor.submit(run_test, path, test_type, exec_args))
|
||||
for repeat in range(args.repeat):
|
||||
futures.append(executor.submit(run_test, path, repeat, test_type, exec_args))
|
||||
|
||||
results = []
|
||||
cookie = len(futures)
|
||||
|
||||
@@ -844,14 +844,20 @@ inline std::basic_ostream<Args...> & operator<<(std::basic_ostream<Args...> & os
|
||||
}
|
||||
}
|
||||
|
||||
namespace {
|
||||
|
||||
void throw_on_error(const sstring& opt, const sstring& msg, std::optional<utils::config_file::value_status> status) {
|
||||
if (status != config::value_status::Invalid) {
|
||||
throw std::invalid_argument(msg + " : " + opt);
|
||||
}
|
||||
}
|
||||
|
||||
} // anonymous namespace
|
||||
|
||||
SEASTAR_TEST_CASE(test_parse_yaml) {
|
||||
config cfg;
|
||||
|
||||
cfg.read_from_yaml(cassandra_conf, [](auto& opt, auto& msg, auto status) {
|
||||
if (status != config::value_status::Invalid) {
|
||||
throw std::invalid_argument(msg + " : " + opt);
|
||||
}
|
||||
});
|
||||
cfg.read_from_yaml(cassandra_conf, throw_on_error);
|
||||
|
||||
BOOST_CHECK_EQUAL(cfg.cluster_name(), "Test Cluster");
|
||||
BOOST_CHECK_EQUAL(cfg.cluster_name.is_set(), true);
|
||||
@@ -917,3 +923,78 @@ SEASTAR_TEST_CASE(test_parse_broken) {
|
||||
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
using ef = experimental_features_t;
|
||||
using features = std::vector<enum_option<ef>>;
|
||||
|
||||
SEASTAR_TEST_CASE(test_parse_experimental_features_cdc) {
|
||||
config cfg;
|
||||
cfg.read_from_yaml("experimental_features:\n - cdc\n", throw_on_error);
|
||||
BOOST_CHECK_EQUAL(cfg.experimental_features(), features{ef::CDC});
|
||||
BOOST_CHECK(cfg.check_experimental(ef::CDC));
|
||||
BOOST_CHECK(!cfg.check_experimental(ef::LWT));
|
||||
BOOST_CHECK(!cfg.check_experimental(ef::UDF));
|
||||
return make_ready_future();
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_parse_experimental_features_lwt) {
|
||||
config cfg;
|
||||
cfg.read_from_yaml("experimental_features:\n - lwt\n", throw_on_error);
|
||||
BOOST_CHECK_EQUAL(cfg.experimental_features(), features{ef::LWT});
|
||||
BOOST_CHECK(!cfg.check_experimental(ef::CDC));
|
||||
BOOST_CHECK(cfg.check_experimental(ef::LWT));
|
||||
BOOST_CHECK(!cfg.check_experimental(ef::UDF));
|
||||
return make_ready_future();
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_parse_experimental_features_udf) {
|
||||
config cfg;
|
||||
cfg.read_from_yaml("experimental_features:\n - udf\n", throw_on_error);
|
||||
BOOST_CHECK_EQUAL(cfg.experimental_features(), features{ef::UDF});
|
||||
BOOST_CHECK(!cfg.check_experimental(ef::CDC));
|
||||
BOOST_CHECK(!cfg.check_experimental(ef::LWT));
|
||||
BOOST_CHECK(cfg.check_experimental(ef::UDF));
|
||||
return make_ready_future();
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_parse_experimental_features_multiple) {
|
||||
config cfg;
|
||||
cfg.read_from_yaml("experimental_features:\n - cdc\n - lwt\n - cdc\n", throw_on_error);
|
||||
BOOST_CHECK_EQUAL(cfg.experimental_features(), (features{ef::CDC, ef::LWT, ef::CDC}));
|
||||
BOOST_CHECK(cfg.check_experimental(ef::CDC));
|
||||
BOOST_CHECK(cfg.check_experimental(ef::LWT));
|
||||
BOOST_CHECK(!cfg.check_experimental(ef::UDF));
|
||||
return make_ready_future();
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_parse_experimental_features_invalid) {
|
||||
config cfg;
|
||||
using value_status = utils::config_file::value_status;
|
||||
cfg.read_from_yaml("experimental_features:\n - invalidoptiontvaluedonotuse\n",
|
||||
[&cfg] (const sstring& opt, const sstring& msg, std::optional<value_status> status) {
|
||||
BOOST_REQUIRE_EQUAL(opt, "experimental_features");
|
||||
BOOST_REQUIRE_NE(msg.find("line 2, column 7"), msg.npos);
|
||||
BOOST_CHECK(!cfg.check_experimental(ef::CDC));
|
||||
BOOST_CHECK(!cfg.check_experimental(ef::LWT));
|
||||
BOOST_CHECK(!cfg.check_experimental(ef::UDF));
|
||||
});
|
||||
return make_ready_future();
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_parse_experimental_true) {
|
||||
config cfg;
|
||||
cfg.read_from_yaml("experimental: true", throw_on_error);
|
||||
BOOST_CHECK(cfg.check_experimental(ef::CDC));
|
||||
BOOST_CHECK(cfg.check_experimental(ef::LWT));
|
||||
BOOST_CHECK(cfg.check_experimental(ef::UDF));
|
||||
return make_ready_future();
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_parse_experimental_false) {
|
||||
config cfg;
|
||||
cfg.read_from_yaml("experimental: false", throw_on_error);
|
||||
BOOST_CHECK(!cfg.check_experimental(ef::CDC));
|
||||
BOOST_CHECK(!cfg.check_experimental(ef::LWT));
|
||||
BOOST_CHECK(!cfg.check_experimental(ef::UDF));
|
||||
return make_ready_future();
|
||||
}
|
||||
|
||||
@@ -4070,6 +4070,8 @@ SEASTAR_TEST_CASE(test_like_operator_on_clustering_key) {
|
||||
require_rows(e, "select s from t where s like '%c' allow filtering", {{T("abc")}});
|
||||
cquery_nofail(e, "insert into t (p, s) values (2, 'acc')");
|
||||
require_rows(e, "select s from t where s like '%c' allow filtering", {{T("abc")}, {T("acc")}});
|
||||
cquery_nofail(e, "insert into t (p, s) values (2, 'acd')");
|
||||
require_rows(e, "select s from t where p = 2 and s like '%c' allow filtering", {{T("acc")}});
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -352,7 +352,10 @@ public:
|
||||
cfg->view_hints_directory.set(data_dir_path + "/view_hints.dir");
|
||||
cfg->num_tokens.set(256);
|
||||
cfg->ring_delay_ms.set(500);
|
||||
cfg->experimental.set(true);
|
||||
auto features = cfg->experimental_features();
|
||||
features.emplace_back(db::experimental_features_t::CDC);
|
||||
features.emplace_back(db::experimental_features_t::LWT);
|
||||
cfg->experimental_features(features);
|
||||
cfg->shutdown_announce_in_ms.set(0);
|
||||
cfg->broadcast_to_all_shards().get();
|
||||
create_directories((data_dir_path + "/system").c_str());
|
||||
|
||||
175
tests/enum_option_test.cc
Normal file
175
tests/enum_option_test.cc
Normal file
@@ -0,0 +1,175 @@
|
||||
/*
|
||||
* Copyright (C) 2019 ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* This file is part of Scylla.
|
||||
*
|
||||
* Scylla is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Affero General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* Scylla is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#define BOOST_TEST_MODULE core
|
||||
|
||||
#include <boost/test/unit_test.hpp>
|
||||
|
||||
#include <array>
|
||||
#include <boost/program_options.hpp>
|
||||
#include <map>
|
||||
#include <set>
|
||||
#include <sstream>
|
||||
#include <string>
|
||||
#include <unordered_map>
|
||||
|
||||
#include "utils/enum_option.hh"
|
||||
|
||||
namespace po = boost::program_options;
|
||||
|
||||
namespace {
|
||||
|
||||
struct days {
|
||||
enum enumeration { Mo, Tu, We, Th, Fr, Sa, Su };
|
||||
static std::unordered_map<std::string, enumeration> map() {
|
||||
return {{"Mon", Mo}, {"Tue", Tu}, {"Wed", We}, {"Thu", Th}, {"Fri", Fr}, {"Sat", Sa}, {"Sun", Su}};
|
||||
}
|
||||
};
|
||||
|
||||
template <typename T>
|
||||
enum_option<T> parse(const char* value) {
|
||||
po::options_description desc("Allowed options");
|
||||
desc.add_options()("opt", po::value<enum_option<T>>(), "Option");
|
||||
po::variables_map vm;
|
||||
const char* argv[] = {"$0", "--opt", value};
|
||||
po::store(po::parse_command_line(3, argv, desc), vm);
|
||||
return vm["opt"].as<enum_option<T>>();
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
std::string format(typename T::enumeration d) {
|
||||
std::ostringstream os;
|
||||
os << enum_option<T>(d);
|
||||
return os.str();
|
||||
}
|
||||
|
||||
} // anonymous namespace
|
||||
|
||||
BOOST_AUTO_TEST_CASE(test_parsing) {
|
||||
BOOST_CHECK_EQUAL(parse<days>("Sun"), days::Su);
|
||||
BOOST_CHECK_EQUAL(parse<days>("Mon"), days::Mo);
|
||||
BOOST_CHECK_EQUAL(parse<days>("Tue"), days::Tu);
|
||||
BOOST_CHECK_EQUAL(parse<days>("Wed"), days::We);
|
||||
BOOST_CHECK_EQUAL(parse<days>("Thu"), days::Th);
|
||||
BOOST_CHECK_EQUAL(parse<days>("Fri"), days::Fr);
|
||||
BOOST_CHECK_EQUAL(parse<days>("Sat"), days::Sa);
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_CASE(test_parsing_error) {
|
||||
BOOST_REQUIRE_THROW(parse<days>("Sunday"), po::invalid_option_value);
|
||||
BOOST_REQUIRE_THROW(parse<days>(""), po::invalid_option_value);
|
||||
BOOST_REQUIRE_THROW(parse<days>(" "), po::invalid_option_value);
|
||||
BOOST_REQUIRE_THROW(parse<days>(" Sun"), po::invalid_option_value);
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_CASE(test_formatting) {
|
||||
BOOST_CHECK_EQUAL(format<days>(days::Mo), "Mon");
|
||||
BOOST_CHECK_EQUAL(format<days>(days::Tu), "Tue");
|
||||
BOOST_CHECK_EQUAL(format<days>(days::We), "Wed");
|
||||
BOOST_CHECK_EQUAL(format<days>(days::Th), "Thu");
|
||||
BOOST_CHECK_EQUAL(format<days>(days::Fr), "Fri");
|
||||
BOOST_CHECK_EQUAL(format<days>(days::Sa), "Sat");
|
||||
BOOST_CHECK_EQUAL(format<days>(days::Su), "Sun");
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_CASE(test_formatting_unknown) {
|
||||
BOOST_CHECK_EQUAL(format<days>(static_cast<days::enumeration>(77)), "?unknown");
|
||||
}
|
||||
|
||||
namespace {
|
||||
|
||||
struct names {
|
||||
enum enumeration { John, Jane, Jim };
|
||||
static std::map<std::string, enumeration> map() {
|
||||
return {{"John", John}, {"Jane", Jane}, {"James", Jim}};
|
||||
}
|
||||
};
|
||||
|
||||
} // anonymous namespace
|
||||
|
||||
BOOST_AUTO_TEST_CASE(test_ordered_map) {
|
||||
BOOST_CHECK_EQUAL(parse<names>("James"), names::Jim);
|
||||
BOOST_CHECK_EQUAL(format<names>(names::Jim), "James");
|
||||
BOOST_CHECK_EQUAL(parse<names>("John"), names::John);
|
||||
BOOST_CHECK_EQUAL(format<names>(names::John), "John");
|
||||
BOOST_CHECK_EQUAL(parse<names>("Jane"), names::Jane);
|
||||
BOOST_CHECK_EQUAL(format<names>(names::Jane), "Jane");
|
||||
BOOST_CHECK_THROW(parse<names>("Jimbo"), po::invalid_option_value);
|
||||
BOOST_CHECK_EQUAL(format<names>(static_cast<names::enumeration>(77)), "?unknown");
|
||||
}
|
||||
|
||||
namespace {
|
||||
|
||||
struct cities {
|
||||
enum enumeration { SF, TO, NY };
|
||||
static std::unordered_map<std::string, enumeration> map() {
|
||||
return {
|
||||
{"SanFrancisco", SF}, {"SF", SF}, {"SFO", SF}, {"Frisco", SF},
|
||||
{"Toronto", TO}, {"TO", TO}, {"YYZ", TO}, {"TheSix", TO},
|
||||
{"NewYork", NY}, {"NY", NY}, {"NYC", NY}, {"BigApple", NY},
|
||||
};
|
||||
}
|
||||
};
|
||||
|
||||
} // anonymous namespace
|
||||
|
||||
BOOST_AUTO_TEST_CASE(test_multiple_parse) {
|
||||
BOOST_CHECK_EQUAL(parse<cities>("SanFrancisco"), cities::SF);
|
||||
BOOST_CHECK_EQUAL(parse<cities>("SF"), cities::SF);
|
||||
BOOST_CHECK_EQUAL(parse<cities>("SFO"), cities::SF);
|
||||
BOOST_CHECK_EQUAL(parse<cities>("Frisco"), cities::SF);
|
||||
BOOST_CHECK_EQUAL(parse<cities>("Toronto"), cities::TO);
|
||||
BOOST_CHECK_EQUAL(parse<cities>("TO"), cities::TO);
|
||||
BOOST_CHECK_EQUAL(parse<cities>("YYZ"), cities::TO);
|
||||
BOOST_CHECK_EQUAL(parse<cities>("TheSix"), cities::TO);
|
||||
BOOST_CHECK_EQUAL(parse<cities>("NewYork"), cities::NY);
|
||||
BOOST_CHECK_EQUAL(parse<cities>("NY"), cities::NY);
|
||||
BOOST_CHECK_EQUAL(parse<cities>("NYC"), cities::NY);
|
||||
BOOST_CHECK_EQUAL(parse<cities>("BigApple"), cities::NY);
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_CASE(test_multiple_format) {
|
||||
BOOST_CHECK((std::set<std::string>{"SanFrancisco", "SF", "SFO", "Frisco"}).count(format<cities>(cities::SF)));
|
||||
BOOST_CHECK((std::set<std::string>{"Toronto", "TO", "YYZ", "TheSix"}).count(format<cities>(cities::TO)));
|
||||
BOOST_CHECK((std::set<std::string>{"NewYork", "NY", "NYC", "BigApple"}).count(format<cities>(cities::NY)));
|
||||
}
|
||||
|
||||
namespace {
|
||||
|
||||
struct numbers {
|
||||
enum enumeration { ONE, TWO };
|
||||
static std::unordered_map<int, enumeration> map() {
|
||||
return {{1, ONE}, {2, TWO}};
|
||||
}
|
||||
};
|
||||
|
||||
} // anonymous namespace
|
||||
|
||||
BOOST_AUTO_TEST_CASE(test_non_string) {
|
||||
BOOST_CHECK_EQUAL(parse<numbers>("1"), numbers::ONE);
|
||||
BOOST_CHECK_EQUAL(parse<numbers>("2"), numbers::TWO);
|
||||
BOOST_CHECK_THROW(parse<numbers>("3"), po::invalid_option_value);
|
||||
BOOST_CHECK_THROW(parse<numbers>("xx"), po::invalid_option_value);
|
||||
BOOST_CHECK_THROW(parse<numbers>(""), po::invalid_option_value);
|
||||
BOOST_CHECK_EQUAL(format<numbers>(numbers::ONE), "1");
|
||||
BOOST_CHECK_EQUAL(format<numbers>(numbers::TWO), "2");
|
||||
BOOST_CHECK_EQUAL(format<numbers>(static_cast<numbers::enumeration>(77)), "?unknown");
|
||||
}
|
||||
@@ -1320,6 +1320,104 @@ SEASTAR_THREAD_TEST_CASE(test_mutation_upgrade_type_change) {
|
||||
assert_that(m).is_equal_to(m2);
|
||||
}
|
||||
|
||||
// This test checks the behavior of row_marker::{is_live, is_dead, compact_and_expire}. Those functions have some
|
||||
// duplicated logic that decides if a row is expired, and this test verifies that they behave the same with respect
|
||||
// to TTL.
|
||||
SEASTAR_THREAD_TEST_CASE(test_row_marker_expiry) {
|
||||
can_gc_fn never_gc = [] (tombstone) { return false; };
|
||||
|
||||
auto must_be_alive = [&] (row_marker mark, gc_clock::time_point t) {
|
||||
BOOST_TEST_MESSAGE(format("must_be_alive({}, {})", mark, t));
|
||||
BOOST_REQUIRE(mark.is_live(tombstone(), t));
|
||||
BOOST_REQUIRE(mark.is_missing() || !mark.is_dead(t));
|
||||
BOOST_REQUIRE(mark.compact_and_expire(tombstone(), t, never_gc, gc_clock::time_point()));
|
||||
};
|
||||
|
||||
auto must_be_dead = [&] (row_marker mark, gc_clock::time_point t) {
|
||||
BOOST_TEST_MESSAGE(format("must_be_dead({}, {})", mark, t));
|
||||
BOOST_REQUIRE(!mark.is_live(tombstone(), t));
|
||||
BOOST_REQUIRE(mark.is_missing() || mark.is_dead(t));
|
||||
BOOST_REQUIRE(!mark.compact_and_expire(tombstone(), t, never_gc, gc_clock::time_point()));
|
||||
};
|
||||
|
||||
const auto timestamp = api::timestamp_type(1);
|
||||
const auto t0 = gc_clock::now();
|
||||
const auto t1 = t0 + 1s;
|
||||
const auto t2 = t0 + 2s;
|
||||
const auto t3 = t0 + 3s;
|
||||
|
||||
// Without timestamp the marker is missing (doesn't exist)
|
||||
const row_marker m1;
|
||||
must_be_dead(m1, t0);
|
||||
must_be_dead(m1, t1);
|
||||
must_be_dead(m1, t2);
|
||||
must_be_dead(m1, t3);
|
||||
|
||||
// With timestamp and without ttl, a row_marker is always alive
|
||||
const row_marker m2(timestamp);
|
||||
must_be_alive(m2, t0);
|
||||
must_be_alive(m2, t1);
|
||||
must_be_alive(m2, t2);
|
||||
must_be_alive(m2, t3);
|
||||
|
||||
// A row_marker becomes dead exactly at the moment of expiry
|
||||
// Reproduces #4263, #5290
|
||||
const auto ttl = 1s;
|
||||
const row_marker m3(timestamp, ttl, t2);
|
||||
must_be_alive(m3, t0);
|
||||
must_be_alive(m3, t1);
|
||||
must_be_dead(m3, t2);
|
||||
must_be_dead(m3, t3);
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_querying_expired_rows) {
|
||||
auto s = schema_builder("ks", "cf")
|
||||
.with_column("pk", bytes_type, column_kind::partition_key)
|
||||
.with_column("ck", bytes_type, column_kind::clustering_key)
|
||||
.build();
|
||||
|
||||
auto pk = partition_key::from_singular(*s, data_value(bytes("key1")));
|
||||
auto ckey1 = clustering_key::from_singular(*s, data_value(bytes("A")));
|
||||
auto ckey2 = clustering_key::from_singular(*s, data_value(bytes("B")));
|
||||
auto ckey3 = clustering_key::from_singular(*s, data_value(bytes("C")));
|
||||
|
||||
auto ttl = 1s;
|
||||
auto t0 = gc_clock::now();
|
||||
auto t1 = t0 + 1s;
|
||||
auto t2 = t0 + 2s;
|
||||
auto t3 = t0 + 3s;
|
||||
|
||||
auto results_at_time = [s] (const mutation& m, gc_clock::time_point t) {
|
||||
auto slice = partition_slice_builder(*s)
|
||||
.without_partition_key_columns()
|
||||
.build();
|
||||
auto opts = query::result_options{query::result_request::result_and_digest, query::digest_algorithm::xxHash};
|
||||
return query::result_set::from_raw_result(s, slice, m.query(slice, opts, t));
|
||||
};
|
||||
|
||||
mutation m(s, pk);
|
||||
m.partition().clustered_row(*m.schema(), ckey1).apply(row_marker(api::new_timestamp(), ttl, t1));
|
||||
m.partition().clustered_row(*m.schema(), ckey2).apply(row_marker(api::new_timestamp(), ttl, t2));
|
||||
m.partition().clustered_row(*m.schema(), ckey3).apply(row_marker(api::new_timestamp(), ttl, t3));
|
||||
|
||||
assert_that(results_at_time(m, t0))
|
||||
.has_size(3)
|
||||
.has(a_row().with_column("ck", data_value(bytes("A"))))
|
||||
.has(a_row().with_column("ck", data_value(bytes("B"))))
|
||||
.has(a_row().with_column("ck", data_value(bytes("C"))));
|
||||
|
||||
assert_that(results_at_time(m, t1))
|
||||
.has_size(2)
|
||||
.has(a_row().with_column("ck", data_value(bytes("B"))))
|
||||
.has(a_row().with_column("ck", data_value(bytes("C"))));
|
||||
|
||||
assert_that(results_at_time(m, t2))
|
||||
.has_size(1)
|
||||
.has(a_row().with_column("ck", data_value(bytes("C"))));
|
||||
|
||||
assert_that(results_at_time(m, t3)).is_empty();
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_querying_expired_cells) {
|
||||
return seastar::async([] {
|
||||
auto s = schema_builder("ks", "cf")
|
||||
|
||||
@@ -371,10 +371,8 @@ SEASTAR_TEST_CASE(test_merging_does_not_alter_tables_which_didnt_change) {
|
||||
muts2.push_back(db::schema_tables::make_scylla_tables_mutation(s0, api::new_timestamp()));
|
||||
mm.announce(muts2).get();
|
||||
|
||||
// SCYLLA_TABLES have additional columns so announcing its mutation
|
||||
// changes the tables
|
||||
BOOST_REQUIRE(s1 != find_table().schema());
|
||||
BOOST_REQUIRE(legacy_version != find_table().schema()->version());
|
||||
BOOST_REQUIRE(s1 == find_table().schema());
|
||||
BOOST_REQUIRE_EQUAL(legacy_version, find_table().schema()->version());
|
||||
});
|
||||
});
|
||||
}
|
||||
@@ -575,7 +573,7 @@ SEASTAR_TEST_CASE(test_prepared_statement_is_invalidated_by_schema_change) {
|
||||
|
||||
// We don't want schema digest to change between Scylla versions because that results in a schema disagreement
|
||||
// during rolling upgrade.
|
||||
future<> test_schema_digest_does_not_change_with_disabled_features(sstring data_dir, std::set<sstring> disabled_features, std::vector<utils::UUID> expected_digests) {
|
||||
future<> test_schema_digest_does_not_change_with_disabled_features(sstring data_dir, std::set<sstring> disabled_features, std::vector<utils::UUID> expected_digests, std::function<void(cql_test_env& e)> extra_schema_changes) {
|
||||
using namespace db;
|
||||
using namespace db::schema_tables;
|
||||
|
||||
@@ -588,6 +586,7 @@ future<> test_schema_digest_does_not_change_with_disabled_features(sstring data_
|
||||
|
||||
auto db_cfg_ptr = make_shared<db::config>();
|
||||
auto& db_cfg = *db_cfg_ptr;
|
||||
db_cfg.experimental_features({experimental_features_t::UDF}, db::config::config_source::CommandLine);
|
||||
if (regenerate) {
|
||||
db_cfg.data_file_directories({data_dir}, db::config::config_source::CommandLine);
|
||||
} else {
|
||||
@@ -597,7 +596,7 @@ future<> test_schema_digest_does_not_change_with_disabled_features(sstring data_
|
||||
cql_test_config cfg_in(db_cfg_ptr);
|
||||
cfg_in.disabled_features = std::move(disabled_features);
|
||||
|
||||
return do_with_cql_env_thread([regenerate, expected_digests = std::move(expected_digests)](cql_test_env& e) {
|
||||
return do_with_cql_env_thread([regenerate, expected_digests = std::move(expected_digests), extra_schema_changes = std::move(extra_schema_changes)] (cql_test_env& e) {
|
||||
if (regenerate) {
|
||||
// Exercise many different kinds of schema changes.
|
||||
e.execute_cql(
|
||||
@@ -613,6 +612,7 @@ future<> test_schema_digest_does_not_change_with_disabled_features(sstring data_
|
||||
e.execute_cql(
|
||||
"create keyspace tests2 with replication = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };").get();
|
||||
e.execute_cql("drop keyspace tests2;").get();
|
||||
extra_schema_changes(e);
|
||||
}
|
||||
|
||||
auto expect_digest = [&] (schema_features sf, utils::UUID expected) {
|
||||
@@ -673,7 +673,7 @@ SEASTAR_TEST_CASE(test_schema_digest_does_not_change) {
|
||||
utils::UUID("1d91ad22-ea7c-3e7f-9557-87f0f3bb94d7"),
|
||||
utils::UUID("2dcd4a37-cbb5-399b-b3c9-8eb1398b096b")
|
||||
};
|
||||
return test_schema_digest_does_not_change_with_disabled_features("./tests/sstables/schema_digest_test", std::set<sstring>{"COMPUTED_COLUMNS"}, std::move(expected_digests));
|
||||
return test_schema_digest_does_not_change_with_disabled_features("./tests/sstables/schema_digest_test", std::set<sstring>{"COMPUTED_COLUMNS", "CDC"}, std::move(expected_digests), [] (cql_test_env& e) {});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_schema_digest_does_not_change_after_computed_columns) {
|
||||
@@ -688,5 +688,26 @@ SEASTAR_TEST_CASE(test_schema_digest_does_not_change_after_computed_columns) {
|
||||
utils::UUID("d58e5214-516e-3d0b-95b5-01ab71584a8d"),
|
||||
utils::UUID("e1b50bed-2ab8-3759-92c7-1f4288046ae6")
|
||||
};
|
||||
return test_schema_digest_does_not_change_with_disabled_features("./tests/sstables/schema_digest_test_computed_columns", std::set<sstring>{}, std::move(expected_digests));
|
||||
return test_schema_digest_does_not_change_with_disabled_features("./tests/sstables/schema_digest_test_computed_columns", std::set<sstring>{"CDC"}, std::move(expected_digests), [] (cql_test_env& e) {});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_schema_digest_does_not_change_with_cdc_options) {
|
||||
std::vector<utils::UUID> expected_digests{
|
||||
utils::UUID("a1f07f31-59d6-372a-8c94-7ea467354b39"),
|
||||
utils::UUID("524d418d-a2e2-3fc3-bf45-5fb79b33c7e4"),
|
||||
utils::UUID("524d418d-a2e2-3fc3-bf45-5fb79b33c7e4"),
|
||||
utils::UUID("018fccba-8050-3bb9-a0a5-2b3c5f0371fe"),
|
||||
utils::UUID("018fccba-8050-3bb9-a0a5-2b3c5f0371fe"),
|
||||
utils::UUID("58f4254e-cc3b-3d56-8a45-167f9a3ea423"),
|
||||
utils::UUID("48fda4f8-d7b5-3e59-a47a-7397989a9bf8"),
|
||||
utils::UUID("8049bcfe-eb01-3a59-af33-16cef8a34b45"),
|
||||
utils::UUID("2195a821-b2b8-3cb8-a179-2f5042e90841")
|
||||
};
|
||||
return test_schema_digest_does_not_change_with_disabled_features(
|
||||
"./tests/sstables/schema_digest_test_cdc_options",
|
||||
std::set<sstring>{},
|
||||
std::move(expected_digests),
|
||||
[] (cql_test_env& e) {
|
||||
e.execute_cql("create table tests.table_cdc (pk int primary key, c1 int, c2 int) with cdc = {'enabled':'true'};").get();
|
||||
});
|
||||
}
|
||||
|
||||
Binary file not shown.
Binary file not shown.
@@ -0,0 +1 @@
|
||||
547038858
|
||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
@@ -0,0 +1,9 @@
|
||||
CompressionInfo.db
|
||||
Filter.db
|
||||
Data.db
|
||||
Statistics.db
|
||||
TOC.txt
|
||||
Digest.crc32
|
||||
Scylla.db
|
||||
Index.db
|
||||
Summary.db
|
||||
Binary file not shown.
Binary file not shown.
@@ -0,0 +1 @@
|
||||
579241509
|
||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
@@ -0,0 +1,9 @@
|
||||
CompressionInfo.db
|
||||
Filter.db
|
||||
Data.db
|
||||
Statistics.db
|
||||
TOC.txt
|
||||
Digest.crc32
|
||||
Scylla.db
|
||||
Index.db
|
||||
Summary.db
|
||||
Binary file not shown.
Binary file not shown.
@@ -0,0 +1 @@
|
||||
2172984605
|
||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
@@ -0,0 +1,9 @@
|
||||
CompressionInfo.db
|
||||
Filter.db
|
||||
Data.db
|
||||
Statistics.db
|
||||
TOC.txt
|
||||
Digest.crc32
|
||||
Scylla.db
|
||||
Index.db
|
||||
Summary.db
|
||||
Binary file not shown.
Binary file not shown.
@@ -0,0 +1 @@
|
||||
2014320564
|
||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
@@ -0,0 +1,9 @@
|
||||
CompressionInfo.db
|
||||
Filter.db
|
||||
Data.db
|
||||
Statistics.db
|
||||
TOC.txt
|
||||
Digest.crc32
|
||||
Scylla.db
|
||||
Index.db
|
||||
Summary.db
|
||||
Binary file not shown.
Binary file not shown.
@@ -0,0 +1 @@
|
||||
1560009820
|
||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
@@ -0,0 +1,9 @@
|
||||
CompressionInfo.db
|
||||
Filter.db
|
||||
Data.db
|
||||
Statistics.db
|
||||
TOC.txt
|
||||
Digest.crc32
|
||||
Scylla.db
|
||||
Index.db
|
||||
Summary.db
|
||||
Binary file not shown.
Binary file not shown.
@@ -0,0 +1 @@
|
||||
3276779049
|
||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user