Compare commits
32 Commits
next
...
scylla-3.2
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f9b11c9b30 | ||
|
|
798357f656 | ||
|
|
7eb86fbbb4 | ||
|
|
edf431f581 | ||
|
|
3f358c9772 | ||
|
|
c8d5738a48 | ||
|
|
de5c06414b | ||
|
|
de314dfe30 | ||
|
|
40a077bf93 | ||
|
|
d488e762cf | ||
|
|
637d80ffcf | ||
|
|
39b17be562 | ||
|
|
e54df0585e | ||
|
|
7d113bd1e9 | ||
|
|
2e7cd77bc4 | ||
|
|
0a38d2b0ee | ||
|
|
2ff26d1160 | ||
|
|
9dd714ae64 | ||
|
|
3980570520 | ||
|
|
9889e553e6 | ||
|
|
3e0b09faa1 | ||
|
|
bc4106ff45 | ||
|
|
df3563c1ae | ||
|
|
1c89961c4f | ||
|
|
85b1a45252 | ||
|
|
6a847e2242 | ||
|
|
10cf0e0d91 | ||
|
|
8c1474c039 | ||
|
|
bb5e9527bb | ||
|
|
4dae72b2cd | ||
|
|
1e444a3dd5 | ||
|
|
76906d6134 |
2
.gitmodules
vendored
2
.gitmodules
vendored
@@ -1,6 +1,6 @@
|
||||
[submodule "seastar"]
|
||||
path = seastar
|
||||
url = ../seastar
|
||||
url = ../scylla-seastar
|
||||
ignore = dirty
|
||||
[submodule "swagger-ui"]
|
||||
path = swagger-ui
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
#!/bin/sh
|
||||
|
||||
PRODUCT=scylla
|
||||
VERSION=666.development
|
||||
VERSION=3.2.0
|
||||
|
||||
if test -f version
|
||||
then
|
||||
|
||||
@@ -74,7 +74,7 @@ public:
|
||||
options() = default;
|
||||
options(const std::map<sstring, sstring>& map) {
|
||||
if (map.find("enabled") == std::end(map)) {
|
||||
throw exceptions::configuration_exception("Missing enabled CDC option");
|
||||
return;
|
||||
}
|
||||
|
||||
for (auto& p : map) {
|
||||
@@ -92,6 +92,9 @@ public:
|
||||
}
|
||||
}
|
||||
std::map<sstring, sstring> to_map() const {
|
||||
if (!_enabled) {
|
||||
return {};
|
||||
}
|
||||
return {
|
||||
{ "enabled", _enabled ? "true" : "false" },
|
||||
{ "preimage", _preimage ? "true" : "false" },
|
||||
|
||||
@@ -241,7 +241,10 @@ batch_size_fail_threshold_in_kb: 50
|
||||
# broadcast_rpc_address: 1.2.3.4
|
||||
|
||||
# Uncomment to enable experimental features
|
||||
# experimental: true
|
||||
# experimental_features:
|
||||
# - cdc
|
||||
# - lwt
|
||||
# - udf
|
||||
|
||||
# The directory where hints files are stored if hinted handoff is enabled.
|
||||
# hints_directory: /var/lib/scylla/hints
|
||||
|
||||
@@ -381,6 +381,7 @@ scylla_tests = [
|
||||
'tests/data_listeners_test',
|
||||
'tests/truncation_migration_test',
|
||||
'tests/like_matcher_test',
|
||||
'tests/enum_option_test',
|
||||
]
|
||||
|
||||
perf_tests = [
|
||||
@@ -875,6 +876,7 @@ pure_boost_tests = set([
|
||||
'tests/top_k_test',
|
||||
'tests/small_vector_test',
|
||||
'tests/like_matcher_test',
|
||||
'tests/enum_option_test',
|
||||
])
|
||||
|
||||
tests_not_using_seastar_test_framework = set([
|
||||
|
||||
@@ -478,7 +478,7 @@ inline bool single_column_primary_key_restrictions<clustering_key>::needs_filter
|
||||
// 3. a SLICE restriction isn't on a last place
|
||||
column_id position = 0;
|
||||
for (const auto& restriction : _restrictions->restrictions() | boost::adaptors::map_values) {
|
||||
if (restriction->is_contains() || position != restriction->get_column_def().id) {
|
||||
if (restriction->is_contains() || restriction->is_LIKE() || position != restriction->get_column_def().id) {
|
||||
return true;
|
||||
}
|
||||
if (!restriction->is_slice()) {
|
||||
|
||||
@@ -111,7 +111,9 @@ lw_shared_ptr<query::read_command> cas_request::read_command() const {
|
||||
} else {
|
||||
ranges = query::clustering_range::deoverlap(std::move(ranges), clustering_key::tri_compare(*_schema));
|
||||
}
|
||||
query::partition_slice ps(std::move(ranges), *_schema, columns_to_read, update_parameters::options);
|
||||
auto options = update_parameters::options;
|
||||
options.set(query::partition_slice::option::always_return_static_content);
|
||||
query::partition_slice ps(std::move(ranges), *_schema, columns_to_read, options);
|
||||
ps.set_partition_row_limit(max_rows);
|
||||
return make_lw_shared<query::read_command>(_schema->id(), _schema->version(), std::move(ps));
|
||||
}
|
||||
|
||||
@@ -60,7 +60,6 @@ public:
|
||||
static constexpr query::partition_slice::option_set options = query::partition_slice::option_set::of<
|
||||
query::partition_slice::option::send_partition_key,
|
||||
query::partition_slice::option::send_clustering_key,
|
||||
query::partition_slice::option::always_return_static_content,
|
||||
query::partition_slice::option::collections_as_maps>();
|
||||
|
||||
// Holder for data for
|
||||
|
||||
@@ -1984,7 +1984,8 @@ flat_mutation_reader make_multishard_streaming_reader(distributed<database>& db,
|
||||
return make_multishard_combining_reader(make_shared<streaming_reader_lifecycle_policy>(db), partitioner, std::move(s), pr, ps, pc,
|
||||
std::move(trace_state), fwd_mr);
|
||||
});
|
||||
return make_flat_multi_range_reader(std::move(schema), std::move(ms), std::move(range_generator), schema->full_slice(),
|
||||
auto&& full_slice = schema->full_slice();
|
||||
return make_flat_multi_range_reader(std::move(schema), std::move(ms), std::move(range_generator), std::move(full_slice),
|
||||
service::get_local_streaming_read_priority(), {}, mutation_reader::forwarding::no);
|
||||
}
|
||||
|
||||
|
||||
@@ -1241,6 +1241,34 @@ void db::commitlog::segment_manager::flush_segments(bool force) {
|
||||
}
|
||||
}
|
||||
|
||||
/// \brief Helper for ensuring a file is closed if an exception is thrown.
|
||||
///
|
||||
/// The file provided by the file_fut future is passed to func.
|
||||
/// * If func throws an exception E, the file is closed and we return
|
||||
/// a failed future with E.
|
||||
/// * If func returns a value V, the file is not closed and we return
|
||||
/// a future with V.
|
||||
/// Note that when an exception is not thrown, it is the
|
||||
/// responsibility of func to make sure the file will be closed. It
|
||||
/// can close the file itself, return it, or store it somewhere.
|
||||
///
|
||||
/// \tparam Func The type of function this wraps
|
||||
/// \param file_fut A future that produces a file
|
||||
/// \param func A function that uses a file
|
||||
/// \return A future that passes the file produced by file_fut to func
|
||||
/// and closes it if func fails
|
||||
template <typename Func>
|
||||
static auto close_on_failure(future<file> file_fut, Func func) {
|
||||
return file_fut.then([func = std::move(func)](file f) {
|
||||
return futurize_apply(func, f).handle_exception([f] (std::exception_ptr e) mutable {
|
||||
return f.close().then_wrapped([f, e = std::move(e)] (future<> x) {
|
||||
using futurator = futurize<std::result_of_t<Func(file)>>;
|
||||
return futurator::make_exception_future(e);
|
||||
});
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
future<db::commitlog::segment_manager::sseg_ptr> db::commitlog::segment_manager::allocate_segment_ex(const descriptor& d, sstring filename, open_flags flags, bool active) {
|
||||
file_open_options opt;
|
||||
opt.extent_allocation_size_hint = max_size;
|
||||
@@ -1258,7 +1286,7 @@ future<db::commitlog::segment_manager::sseg_ptr> db::commitlog::segment_manager:
|
||||
return fut;
|
||||
});
|
||||
|
||||
return fut.then([this, d, active, filename, flags](file f) {
|
||||
return close_on_failure(std::move(fut), [this, d, active, filename, flags] (file f) {
|
||||
f = make_checked_file(commit_error_handler, f);
|
||||
// xfs doesn't like files extended betond eof, so enlarge the file
|
||||
auto fut = make_ready_future<>();
|
||||
|
||||
@@ -276,7 +276,7 @@ future<> db::commitlog_replayer::impl::process(stats* s, commitlog::buffer_and_r
|
||||
}
|
||||
|
||||
auto shard = _db.local().shard_of(fm);
|
||||
return _db.invoke_on(shard, [this, cer = std::move(cer), &src_cm, rp, shard, s] (database& db) -> future<> {
|
||||
return _db.invoke_on(shard, [this, cer = std::move(cer), &src_cm, rp, shard, s] (database& db) mutable -> future<> {
|
||||
auto& fm = cer.mutation();
|
||||
// TODO: might need better verification that the deserialized mutation
|
||||
// is schema compatible. My guess is that just applying the mutation
|
||||
@@ -306,7 +306,9 @@ future<> db::commitlog_replayer::impl::process(stats* s, commitlog::buffer_and_r
|
||||
return db.apply_in_memory(m, cf, db::rp_handle(), db::no_timeout);
|
||||
});
|
||||
} else {
|
||||
return db.apply_in_memory(fm, cf.schema(), db::rp_handle(), db::no_timeout);
|
||||
return do_with(std::move(cer).mutation(), [&](const frozen_mutation& m) {
|
||||
return db.apply_in_memory(m, cf.schema(), db::rp_handle(), db::no_timeout);
|
||||
});
|
||||
}
|
||||
}).then_wrapped([s] (future<> f) {
|
||||
try {
|
||||
|
||||
39
db/config.cc
39
db/config.cc
@@ -22,6 +22,7 @@
|
||||
|
||||
#include <unordered_map>
|
||||
#include <regex>
|
||||
#include <sstream>
|
||||
|
||||
#include <boost/any.hpp>
|
||||
#include <boost/program_options.hpp>
|
||||
@@ -108,6 +109,10 @@ const config_type config_type_for<int32_t> = config_type("integer", value_to_jso
|
||||
template <>
|
||||
const config_type config_type_for<db::seed_provider_type> = config_type("seed provider", seed_provider_to_json);
|
||||
|
||||
template <>
|
||||
const config_type config_type_for<std::vector<enum_option<db::experimental_features_t>>> = config_type(
|
||||
"experimental features", value_to_json<std::vector<sstring>>);
|
||||
|
||||
}
|
||||
|
||||
namespace YAML {
|
||||
@@ -153,6 +158,23 @@ struct convert<db::config::seed_provider_type> {
|
||||
}
|
||||
};
|
||||
|
||||
template <>
|
||||
class convert<enum_option<db::experimental_features_t>> {
|
||||
public:
|
||||
static bool decode(const Node& node, enum_option<db::experimental_features_t>& rhs) {
|
||||
std::string name;
|
||||
if (!convert<std::string>::decode(node, name)) {
|
||||
return false;
|
||||
}
|
||||
try {
|
||||
std::istringstream(name) >> rhs;
|
||||
} catch (boost::program_options::invalid_option_value&) {
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
#if defined(DEBUG)
|
||||
@@ -669,7 +691,8 @@ db::config::config(std::shared_ptr<db::extensions> exts)
|
||||
, shutdown_announce_in_ms(this, "shutdown_announce_in_ms", value_status::Used, 2 * 1000, "Time a node waits after sending gossip shutdown message in milliseconds. Same as -Dcassandra.shutdown_announce_in_ms in cassandra.")
|
||||
, developer_mode(this, "developer_mode", value_status::Used, false, "Relax environment checks. Setting to true can reduce performance and reliability significantly.")
|
||||
, skip_wait_for_gossip_to_settle(this, "skip_wait_for_gossip_to_settle", value_status::Used, -1, "An integer to configure the wait for gossip to settle. -1: wait normally, 0: do not wait at all, n: wait for at most n polls. Same as -Dcassandra.skip_wait_for_gossip_to_settle in cassandra.")
|
||||
, experimental(this, "experimental", value_status::Used, false, "Set to true to unlock experimental features.")
|
||||
, experimental(this, "experimental", value_status::Used, false, "Set to true to unlock all experimental features.")
|
||||
, experimental_features(this, "experimental_features", value_status::Used, {}, "Unlock experimental features provided as the option arguments (possible values: 'lwt', 'cdc', 'udf'). Can be repeated.")
|
||||
, lsa_reclamation_step(this, "lsa_reclamation_step", value_status::Used, 1, "Minimum number of segments to reclaim in a single step")
|
||||
, prometheus_port(this, "prometheus_port", value_status::Used, 9180, "Prometheus port, set to zero to disable")
|
||||
, prometheus_address(this, "prometheus_address", value_status::Used, "0.0.0.0", "Prometheus listening address")
|
||||
@@ -779,10 +802,12 @@ db::fs::path db::config::get_conf_dir() {
|
||||
return confdir;
|
||||
}
|
||||
|
||||
void db::config::check_experimental(const sstring& what) const {
|
||||
if (!experimental()) {
|
||||
throw std::runtime_error(format("{} is currently disabled. Start Scylla with --experimental=on to enable.", what));
|
||||
bool db::config::check_experimental(experimental_features_t::feature f) const {
|
||||
if (experimental()) {
|
||||
return true;
|
||||
}
|
||||
const auto& optval = experimental_features();
|
||||
return find(begin(optval), end(optval), enum_option<experimental_features_t>{f}) != end(optval);
|
||||
}
|
||||
|
||||
namespace bpo = boost::program_options;
|
||||
@@ -827,6 +852,12 @@ const db::extensions& db::config::extensions() const {
|
||||
return *_extensions;
|
||||
}
|
||||
|
||||
std::unordered_map<sstring, db::experimental_features_t::feature> db::experimental_features_t::map() {
|
||||
// We decided against using the construct-on-first-use idiom here:
|
||||
// https://github.com/scylladb/scylla/pull/5369#discussion_r353614807
|
||||
return {{"lwt", LWT}, {"udf", UDF}, {"cdc", CDC}};
|
||||
}
|
||||
|
||||
template struct utils::config_file::named_value<seastar::log_level>;
|
||||
|
||||
namespace utils {
|
||||
|
||||
12
db/config.hh
12
db/config.hh
@@ -33,6 +33,7 @@
|
||||
|
||||
#include "seastarx.hh"
|
||||
#include "utils/config_file.hh"
|
||||
#include "utils/enum_option.hh"
|
||||
|
||||
namespace seastar { class file; struct logging_settings; }
|
||||
|
||||
@@ -75,14 +76,20 @@ sstring config_value_as_json(const std::unordered_map<sstring, log_level>& v);
|
||||
|
||||
namespace db {
|
||||
|
||||
/// Enumeration of all valid values for the `experimental` config entry.
|
||||
struct experimental_features_t {
|
||||
enum feature { LWT, UDF, CDC };
|
||||
static std::unordered_map<sstring, feature> map(); // See enum_option.
|
||||
};
|
||||
|
||||
class config : public utils::config_file {
|
||||
public:
|
||||
config();
|
||||
config(std::shared_ptr<db::extensions>);
|
||||
~config();
|
||||
|
||||
// Throws exception if experimental feature is disabled.
|
||||
void check_experimental(const sstring& what) const;
|
||||
/// True iff the feature is enabled.
|
||||
bool check_experimental(experimental_features_t::feature f) const;
|
||||
|
||||
/**
|
||||
* Scans the environment variables for configuration files directory
|
||||
@@ -263,6 +270,7 @@ public:
|
||||
named_value<bool> developer_mode;
|
||||
named_value<int32_t> skip_wait_for_gossip_to_settle;
|
||||
named_value<bool> experimental;
|
||||
named_value<std::vector<enum_option<experimental_features_t>>> experimental_features;
|
||||
named_value<size_t> lsa_reclamation_step;
|
||||
named_value<uint16_t> prometheus_port;
|
||||
named_value<sstring> prometheus_address;
|
||||
|
||||
@@ -33,12 +33,14 @@ enum class schema_feature {
|
||||
// See https://github.com/scylladb/scylla/issues/4485
|
||||
DIGEST_INSENSITIVE_TO_EXPIRY,
|
||||
COMPUTED_COLUMNS,
|
||||
CDC_OPTIONS,
|
||||
};
|
||||
|
||||
using schema_features = enum_set<super_enum<schema_feature,
|
||||
schema_feature::VIEW_VIRTUAL_COLUMNS,
|
||||
schema_feature::DIGEST_INSENSITIVE_TO_EXPIRY,
|
||||
schema_feature::COMPUTED_COLUMNS
|
||||
schema_feature::COMPUTED_COLUMNS,
|
||||
schema_feature::CDC_OPTIONS
|
||||
>>;
|
||||
|
||||
}
|
||||
|
||||
@@ -294,19 +294,24 @@ schema_ptr tables() {
|
||||
}
|
||||
|
||||
// Holds Scylla-specific table metadata.
|
||||
schema_ptr scylla_tables() {
|
||||
static thread_local auto schema = [] {
|
||||
schema_ptr scylla_tables(schema_features features) {
|
||||
static auto make = [] (bool has_cdc_options) -> schema_ptr {
|
||||
auto id = generate_legacy_id(NAME, SCYLLA_TABLES);
|
||||
return schema_builder(NAME, SCYLLA_TABLES, std::make_optional(id))
|
||||
auto sb = schema_builder(NAME, SCYLLA_TABLES, std::make_optional(id))
|
||||
.with_column("keyspace_name", utf8_type, column_kind::partition_key)
|
||||
.with_column("table_name", utf8_type, column_kind::clustering_key)
|
||||
.with_column("version", uuid_type)
|
||||
.with_column("cdc", map_type_impl::get_instance(utf8_type, utf8_type, false))
|
||||
.set_gc_grace_seconds(schema_gc_grace)
|
||||
.with_version(generate_schema_version(id))
|
||||
.build();
|
||||
}();
|
||||
return schema;
|
||||
.set_gc_grace_seconds(schema_gc_grace);
|
||||
if (has_cdc_options) {
|
||||
sb.with_column("cdc", map_type_impl::get_instance(utf8_type, utf8_type, false));
|
||||
sb.with_version(generate_schema_version(id, 1));
|
||||
} else {
|
||||
sb.with_version(generate_schema_version(id));
|
||||
}
|
||||
return sb.build();
|
||||
};
|
||||
static thread_local schema_ptr schemas[2] = { make(false), make(true) };
|
||||
return schemas[features.contains(schema_feature::CDC_OPTIONS)];
|
||||
}
|
||||
|
||||
// The "columns" table lists the definitions of all columns in all tables
|
||||
@@ -608,14 +613,28 @@ schema_ptr aggregates() {
|
||||
}
|
||||
#endif
|
||||
|
||||
static
|
||||
mutation
|
||||
redact_columns_for_missing_features(mutation m, schema_features features) {
|
||||
if (features.contains(schema_feature::CDC_OPTIONS)) {
|
||||
return std::move(m);
|
||||
}
|
||||
if (m.schema()->cf_name() != SCYLLA_TABLES) {
|
||||
return std::move(m);
|
||||
}
|
||||
slogger.debug("adjusting schema_tables mutation due to possible in-progress cluster upgrade");
|
||||
m.upgrade(scylla_tables(features));
|
||||
return std::move(m);
|
||||
}
|
||||
|
||||
/**
|
||||
* Read schema from system keyspace and calculate MD5 digest of every row, resulting digest
|
||||
* will be converted into UUID which would act as content-based version of the schema.
|
||||
*/
|
||||
future<utils::UUID> calculate_schema_digest(distributed<service::storage_proxy>& proxy, schema_features features)
|
||||
{
|
||||
auto map = [&proxy] (sstring table) {
|
||||
return db::system_keyspace::query_mutations(proxy, NAME, table).then([&proxy, table] (auto rs) {
|
||||
auto map = [&proxy, features] (sstring table) {
|
||||
return db::system_keyspace::query_mutations(proxy, NAME, table).then([&proxy, table, features] (auto rs) {
|
||||
auto s = proxy.local().get_db().local().find_schema(NAME, table);
|
||||
std::vector<mutation> mutations;
|
||||
for (auto&& p : rs->partitions()) {
|
||||
@@ -624,6 +643,7 @@ future<utils::UUID> calculate_schema_digest(distributed<service::storage_proxy>&
|
||||
if (is_system_keyspace(partition_key)) {
|
||||
continue;
|
||||
}
|
||||
mut = redact_columns_for_missing_features(std::move(mut), features);
|
||||
mutations.emplace_back(std::move(mut));
|
||||
}
|
||||
return mutations;
|
||||
@@ -647,8 +667,8 @@ future<utils::UUID> calculate_schema_digest(distributed<service::storage_proxy>&
|
||||
|
||||
future<std::vector<canonical_mutation>> convert_schema_to_mutations(distributed<service::storage_proxy>& proxy, schema_features features)
|
||||
{
|
||||
auto map = [&proxy] (sstring table) {
|
||||
return db::system_keyspace::query_mutations(proxy, NAME, table).then([&proxy, table] (auto rs) {
|
||||
auto map = [&proxy, features] (sstring table) {
|
||||
return db::system_keyspace::query_mutations(proxy, NAME, table).then([&proxy, table, features] (auto rs) {
|
||||
auto s = proxy.local().get_db().local().find_schema(NAME, table);
|
||||
std::vector<canonical_mutation> results;
|
||||
for (auto&& p : rs->partitions()) {
|
||||
@@ -657,6 +677,7 @@ future<std::vector<canonical_mutation>> convert_schema_to_mutations(distributed<
|
||||
if (is_system_keyspace(partition_key)) {
|
||||
continue;
|
||||
}
|
||||
mut = redact_columns_for_missing_features(std::move(mut), features);
|
||||
results.emplace_back(mut);
|
||||
}
|
||||
return results;
|
||||
@@ -669,6 +690,14 @@ future<std::vector<canonical_mutation>> convert_schema_to_mutations(distributed<
|
||||
return map_reduce(all_table_names(features), map, std::vector<canonical_mutation>{}, reduce);
|
||||
}
|
||||
|
||||
std::vector<mutation>
|
||||
adjust_schema_for_schema_features(std::vector<mutation> schema, schema_features features) {
|
||||
for (auto& m : schema) {
|
||||
m = redact_columns_for_missing_features(m, features);
|
||||
}
|
||||
return std::move(schema);
|
||||
}
|
||||
|
||||
future<schema_result>
|
||||
read_schema_for_keyspaces(distributed<service::storage_proxy>& proxy, const sstring& schema_table_name, const std::set<sstring>& keyspace_names)
|
||||
{
|
||||
@@ -1673,7 +1702,19 @@ mutation make_scylla_tables_mutation(schema_ptr table, api::timestamp_type times
|
||||
auto ckey = clustering_key::from_singular(*s, table->cf_name());
|
||||
mutation m(scylla_tables(), pkey);
|
||||
m.set_clustered_cell(ckey, "version", utils::UUID(table->version()), timestamp);
|
||||
store_map(m, ckey, "cdc", timestamp, table->cdc_options().to_map());
|
||||
auto cdc_options = table->cdc_options().to_map();
|
||||
if (!cdc_options.empty()) {
|
||||
store_map(m, ckey, "cdc", timestamp, cdc_options);
|
||||
} else {
|
||||
// Avoid storing anything for cdc disabled, so we don't end up with
|
||||
// different digests on different nodes due to the other node redacting
|
||||
// the cdc column when the cdc cluster feature is disabled.
|
||||
//
|
||||
// Tombstones are not considered for schema digest, so this is okay (and
|
||||
// needed in order for disabling of cdc to have effect).
|
||||
auto& cdc_cdef = *scylla_tables()->get_column_definition("cdc");
|
||||
m.set_clustered_cell(ckey, cdc_cdef, atomic_cell::make_dead(timestamp, gc_clock::now()));
|
||||
}
|
||||
return m;
|
||||
}
|
||||
|
||||
|
||||
@@ -109,7 +109,7 @@ schema_ptr view_virtual_columns();
|
||||
schema_ptr dropped_columns();
|
||||
schema_ptr indexes();
|
||||
schema_ptr tables();
|
||||
schema_ptr scylla_tables();
|
||||
schema_ptr scylla_tables(schema_features features = schema_features::full());
|
||||
schema_ptr views();
|
||||
schema_ptr computed_columns();
|
||||
|
||||
@@ -154,6 +154,7 @@ future<> save_system_keyspace_schema();
|
||||
future<utils::UUID> calculate_schema_digest(distributed<service::storage_proxy>& proxy, schema_features);
|
||||
|
||||
future<std::vector<canonical_mutation>> convert_schema_to_mutations(distributed<service::storage_proxy>& proxy, schema_features);
|
||||
std::vector<mutation> adjust_schema_for_schema_features(std::vector<mutation> schema, schema_features features);
|
||||
|
||||
future<schema_result_value_type>
|
||||
read_schema_partition_for_keyspace(distributed<service::storage_proxy>& proxy, const sstring& schema_table_name, const sstring& keyspace_name);
|
||||
|
||||
@@ -104,10 +104,10 @@ api::timestamp_type schema_creation_timestamp() {
|
||||
// FIXME: Make automatic by calculating from schema structure.
|
||||
static const uint16_t version_sequence_number = 1;
|
||||
|
||||
table_schema_version generate_schema_version(utils::UUID table_id) {
|
||||
table_schema_version generate_schema_version(utils::UUID table_id, uint16_t offset) {
|
||||
md5_hasher h;
|
||||
feed_hash(h, table_id);
|
||||
feed_hash(h, version_sequence_number);
|
||||
feed_hash(h, version_sequence_number + offset);
|
||||
return utils::UUID_gen::get_name_UUID(h.finalize());
|
||||
}
|
||||
|
||||
|
||||
@@ -152,7 +152,7 @@ schema_ptr aggregates();
|
||||
|
||||
}
|
||||
|
||||
table_schema_version generate_schema_version(utils::UUID table_id);
|
||||
table_schema_version generate_schema_version(utils::UUID table_id, uint16_t offset = 0);
|
||||
|
||||
// Only for testing.
|
||||
void minimal_setup(distributed<database>& db, distributed<cql3::query_processor>& qp);
|
||||
|
||||
3
dist/common/scripts/scylla_prepare
vendored
3
dist/common/scripts/scylla_prepare
vendored
@@ -63,7 +63,7 @@ if __name__ == '__main__':
|
||||
run('ip link set dev {TAP} master {BRIDGE}'.format(TAP=tap, BRIDGE=bridge))
|
||||
run('chown {USER}.{GROUP} /dev/vhost-net'.format(USER=user, GROUP=group))
|
||||
elif mode == 'dpdk':
|
||||
ethpcciid = cfg.get('ETHPCIID')
|
||||
ethpciid = cfg.get('ETHPCIID')
|
||||
nr_hugepages = cfg.get('NR_HUGEPAGES')
|
||||
run('modprobe uio')
|
||||
run('modprobe uio_pci_generic')
|
||||
@@ -73,7 +73,6 @@ if __name__ == '__main__':
|
||||
f.write(nr_hugepages)
|
||||
if dist_name() == 'Ubuntu':
|
||||
run('hugeadm --create-mounts')
|
||||
fi
|
||||
else:
|
||||
set_nic_and_disks = get_set_nic_and_disks_config_value(cfg)
|
||||
ifname = cfg.get('IFNAME')
|
||||
|
||||
1
dist/debian/debian/scylla-server.install
vendored
1
dist/debian/debian/scylla-server.install
vendored
@@ -21,6 +21,7 @@ opt/scylladb/scripts/libexec/*
|
||||
opt/scylladb/bin/*
|
||||
opt/scylladb/libreloc/*
|
||||
opt/scylladb/libexec/*
|
||||
usr/lib/scylla/*
|
||||
var/lib/scylla/data
|
||||
var/lib/scylla/commitlog
|
||||
var/lib/scylla/hints
|
||||
|
||||
4
dist/debian/debian/scylla-server.postinst
vendored
4
dist/debian/debian/scylla-server.postinst
vendored
@@ -24,10 +24,6 @@ if [ "$1" = configure ]; then
|
||||
fi
|
||||
|
||||
ln -sfT /etc/scylla /var/lib/scylla/conf
|
||||
if [ -d /usr/lib/scylla ]; then
|
||||
mv /usr/lib/scylla /usr/lib/scylla.old
|
||||
fi
|
||||
ln -sfT /opt/scylladb/scripts /usr/lib/scylla
|
||||
|
||||
grep -v api_ui_dir /etc/scylla/scylla.yaml | grep -v api_doc_dir > /tmp/scylla.yaml
|
||||
echo "api_ui_dir: /opt/scylladb/swagger-ui/dist/" >> /tmp/scylla.yaml
|
||||
|
||||
2
dist/docker/redhat/Dockerfile
vendored
2
dist/docker/redhat/Dockerfile
vendored
@@ -5,7 +5,7 @@ MAINTAINER Avi Kivity <avi@cloudius-systems.com>
|
||||
ENV container docker
|
||||
|
||||
# The SCYLLA_REPO_URL argument specifies the URL to the RPM repository this Docker image uses to install Scylla. The default value is the Scylla's unstable RPM repository, which contains the daily build.
|
||||
ARG SCYLLA_REPO_URL=http://downloads.scylladb.com/rpm/unstable/centos/master/latest/scylla.repo
|
||||
ARG SCYLLA_REPO_URL=http://downloads.scylladb.com/rpm/unstable/centos/branch-3.2/latest/scylla.repo
|
||||
|
||||
ADD scylla_bashrc /scylla_bashrc
|
||||
|
||||
|
||||
2
dist/docker/redhat/scylla-jmx-service.sh
vendored
2
dist/docker/redhat/scylla-jmx-service.sh
vendored
@@ -2,4 +2,4 @@
|
||||
|
||||
source /etc/sysconfig/scylla-jmx
|
||||
|
||||
exec /opt/scylladb/scripts/jmx/scylla-jmx -l /opt/scylladb/scripts/jmx
|
||||
exec /opt/scylladb/jmx/scylla-jmx -l /opt/scylladb/jmx
|
||||
|
||||
10
dist/redhat/scylla.spec.mustache
vendored
10
dist/redhat/scylla.spec.mustache
vendored
@@ -15,6 +15,8 @@ Obsoletes: scylla-server < 1.1
|
||||
%global __brp_python_bytecompile %{nil}
|
||||
%global __brp_mangle_shebangs %{nil}
|
||||
|
||||
%undefine _find_debuginfo_dwz_opts
|
||||
|
||||
%description
|
||||
Scylla is a highly scalable, eventually consistent, distributed,
|
||||
partitioned row DB.
|
||||
@@ -75,9 +77,6 @@ getent passwd scylla || /usr/sbin/useradd -g scylla -s /sbin/nologin -r -d %{_sh
|
||||
if [ -f /etc/systemd/coredump.conf ];then
|
||||
/opt/scylladb/scripts/scylla_coredump_setup
|
||||
fi
|
||||
if [ -d /usr/lib/scylla ]; then
|
||||
mv /usr/lib/scylla /usr/lib/scylla.old
|
||||
fi
|
||||
|
||||
/opt/scylladb/scripts/scylla_post_install.sh
|
||||
|
||||
@@ -95,10 +94,6 @@ if [ -d /tmp/%{name}-%{version}-%{release} ]; then
|
||||
rm -rf /tmp/%{name}-%{version}-%{release}/
|
||||
fi
|
||||
ln -sfT /etc/scylla /var/lib/scylla/conf
|
||||
if [ -d /usr/lib/scylla ]; then
|
||||
mv /usr/lib/scylla /usr/lib/scylla.old
|
||||
fi
|
||||
ln -sfT /opt/scylladb/scripts /usr/lib/scylla
|
||||
|
||||
%clean
|
||||
rm -rf $RPM_BUILD_ROOT
|
||||
@@ -130,6 +125,7 @@ rm -rf $RPM_BUILD_ROOT
|
||||
/opt/scylladb/bin/*
|
||||
/opt/scylladb/libreloc/*
|
||||
/opt/scylladb/libexec/*
|
||||
%{_prefix}/lib/scylla/*
|
||||
%attr(0755,scylla,scylla) %dir %{_sharedstatedir}/scylla/
|
||||
%attr(0755,scylla,scylla) %dir %{_sharedstatedir}/scylla/data
|
||||
%attr(0755,scylla,scylla) %dir %{_sharedstatedir}/scylla/commitlog
|
||||
|
||||
@@ -69,11 +69,6 @@ struct get_sync_boundary_response {
|
||||
uint64_t new_rows_nr;
|
||||
};
|
||||
|
||||
struct get_combined_row_hash_response {
|
||||
repair_hash working_row_buf_combined_csum;
|
||||
uint64_t working_row_buf_nr;
|
||||
};
|
||||
|
||||
enum class row_level_diff_detect_algorithm : uint8_t {
|
||||
send_full_set,
|
||||
send_full_set_rpc_stream,
|
||||
|
||||
14
install.sh
14
install.sh
@@ -219,6 +219,20 @@ EOS
|
||||
for i in $SBINFILES; do
|
||||
ln -srf "$rprefix/scripts/$i" "$rusr/sbin/$i"
|
||||
done
|
||||
|
||||
# we need keep /usr/lib/scylla directory to support upgrade/downgrade
|
||||
# without error, so we need to create symlink for each script on the
|
||||
# directory
|
||||
install -m755 -d "$rusr"/lib/scylla/scyllatop/views
|
||||
for i in $(find "$rprefix"/scripts/ -maxdepth 1 -type f); do
|
||||
ln -srf $i "$rusr"/lib/scylla/
|
||||
done
|
||||
for i in $(find "$rprefix"/scyllatop/ -maxdepth 1 -type f); do
|
||||
ln -srf $i "$rusr"/lib/scylla/scyllatop
|
||||
done
|
||||
for i in $(find "$rprefix"/scyllatop/views -maxdepth 1 -type f); do
|
||||
ln -srf $i "$rusr"/lib/scylla/scyllatop/views
|
||||
done
|
||||
else
|
||||
install -m755 -d "$rdata"/saved_caches
|
||||
install -d -m755 "$retc"/systemd/system/scylla-server.service.d
|
||||
|
||||
@@ -1876,7 +1876,7 @@ bool row_marker::compact_and_expire(tombstone tomb, gc_clock::time_point now,
|
||||
_timestamp = api::missing_timestamp;
|
||||
return false;
|
||||
}
|
||||
if (_ttl > no_ttl && _expiry < now) {
|
||||
if (_ttl > no_ttl && _expiry <= now) {
|
||||
_expiry -= _ttl;
|
||||
_ttl = dead;
|
||||
}
|
||||
|
||||
@@ -679,7 +679,7 @@ public:
|
||||
if (is_missing() || _ttl == dead) {
|
||||
return false;
|
||||
}
|
||||
if (_ttl != no_ttl && _expiry < now) {
|
||||
if (_ttl != no_ttl && _expiry <= now) {
|
||||
return false;
|
||||
}
|
||||
return _timestamp > t.timestamp;
|
||||
@@ -689,7 +689,7 @@ public:
|
||||
if (_ttl == dead) {
|
||||
return true;
|
||||
}
|
||||
return _ttl != no_ttl && _expiry < now;
|
||||
return _ttl != no_ttl && _expiry <= now;
|
||||
}
|
||||
// Can be called only when is_live().
|
||||
bool is_expiring() const {
|
||||
|
||||
@@ -321,11 +321,7 @@ struct get_sync_boundary_response {
|
||||
};
|
||||
|
||||
// Return value of the REPAIR_GET_COMBINED_ROW_HASH RPC verb
|
||||
struct get_combined_row_hash_response {
|
||||
repair_hash working_row_buf_combined_csum;
|
||||
// The number of rows in the working row buf
|
||||
uint64_t working_row_buf_nr;
|
||||
};
|
||||
using get_combined_row_hash_response = repair_hash;
|
||||
|
||||
struct node_repair_meta_id {
|
||||
gms::inet_address ip;
|
||||
|
||||
@@ -1098,14 +1098,14 @@ private:
|
||||
_working_row_buf_combined_hash.clear();
|
||||
|
||||
if (_row_buf.empty()) {
|
||||
return make_ready_future<get_combined_row_hash_response>(get_combined_row_hash_response{repair_hash(), 0});
|
||||
return make_ready_future<get_combined_row_hash_response>(get_combined_row_hash_response());
|
||||
}
|
||||
return move_row_buf_to_working_row_buf().then([this] {
|
||||
return do_for_each(_working_row_buf, [this] (repair_row& r) {
|
||||
_working_row_buf_combined_hash.add(r.hash());
|
||||
return make_ready_future<>();
|
||||
}).then([this] {
|
||||
return get_combined_row_hash_response{_working_row_buf_combined_hash, _working_row_buf.size()};
|
||||
return get_combined_row_hash_response{_working_row_buf_combined_hash};
|
||||
});
|
||||
});
|
||||
}
|
||||
@@ -1352,7 +1352,9 @@ public:
|
||||
auto source_op = get_full_row_hashes_source_op(current_hashes, remote_node, node_idx, source);
|
||||
auto sink_op = get_full_row_hashes_sink_op(sink);
|
||||
return when_all_succeed(std::move(source_op), std::move(sink_op));
|
||||
}).then([current_hashes] () mutable {
|
||||
}).then([this, current_hashes] () mutable {
|
||||
stats().rx_hashes_nr += current_hashes->size();
|
||||
_metrics.rx_hashes_nr += current_hashes->size();
|
||||
return std::move(*current_hashes);
|
||||
});
|
||||
}
|
||||
@@ -1763,6 +1765,7 @@ static future<stop_iteration> repair_get_row_diff_with_rpc_stream_process_op(
|
||||
return make_exception_future<stop_iteration>(std::runtime_error("get_row_diff_with_rpc_stream: Inject error in handler loop"));
|
||||
}
|
||||
bool needs_all_rows = hash_cmd.cmd == repair_stream_cmd::needs_all_rows;
|
||||
_metrics.rx_hashes_nr += current_set_diff.size();
|
||||
auto fp = make_foreign(std::make_unique<std::unordered_set<repair_hash>>(std::move(current_set_diff)));
|
||||
return smp::submit_to(src_cpu_id % smp::count, [from, repair_meta_id, needs_all_rows, fp = std::move(fp)] {
|
||||
auto rm = repair_meta::get_repair_meta(from, repair_meta_id);
|
||||
@@ -2067,6 +2070,7 @@ future<> repair_init_messaging_service_handler(repair_service& rs, distributed<d
|
||||
std::unordered_set<repair_hash> set_diff, bool needs_all_rows) {
|
||||
auto src_cpu_id = cinfo.retrieve_auxiliary<uint32_t>("src_cpu_id");
|
||||
auto from = cinfo.retrieve_auxiliary<gms::inet_address>("baddr");
|
||||
_metrics.rx_hashes_nr += set_diff.size();
|
||||
auto fp = make_foreign(std::make_unique<std::unordered_set<repair_hash>>(std::move(set_diff)));
|
||||
return smp::submit_to(src_cpu_id % smp::count, [from, repair_meta_id, fp = std::move(fp), needs_all_rows] () mutable {
|
||||
auto rm = repair_meta::get_repair_meta(from, repair_meta_id);
|
||||
@@ -2292,8 +2296,8 @@ private:
|
||||
// are identical, there is no need to transfer each and every
|
||||
// row hashes to the repair master.
|
||||
return master.get_combined_row_hash(_common_sync_boundary, _all_nodes[idx]).then([&, this, idx] (get_combined_row_hash_response resp) {
|
||||
rlogger.debug("Calling master.get_combined_row_hash for node {}, got combined_hash={}, rows_nr={}", _all_nodes[idx], resp.working_row_buf_combined_csum, resp.working_row_buf_nr);
|
||||
combined_hashes[idx]= std::move(resp.working_row_buf_combined_csum);
|
||||
rlogger.debug("Calling master.get_combined_row_hash for node {}, got combined_hash={}", _all_nodes[idx], resp);
|
||||
combined_hashes[idx]= std::move(resp);
|
||||
});
|
||||
}).get();
|
||||
|
||||
|
||||
@@ -931,7 +931,6 @@ future<> row_cache::do_update(external_updater eu, memtable& m, Updater updater)
|
||||
});
|
||||
|
||||
return seastar::async([this, &m, updater = std::move(updater), real_dirty_acc = std::move(real_dirty_acc)] () mutable {
|
||||
coroutine update;
|
||||
size_t size_entry;
|
||||
// In case updater fails, we must bring the cache to consistency without deferring.
|
||||
auto cleanup = defer([&m, this] {
|
||||
@@ -939,6 +938,7 @@ future<> row_cache::do_update(external_updater eu, memtable& m, Updater updater)
|
||||
_prev_snapshot_pos = {};
|
||||
_prev_snapshot = {};
|
||||
});
|
||||
coroutine update; // Destroy before cleanup to release snapshots before invalidating.
|
||||
partition_presence_checker is_present = _prev_snapshot->make_partition_presence_checker();
|
||||
while (!m.partitions.empty()) {
|
||||
with_allocator(_tracker.allocator(), [&] () {
|
||||
|
||||
@@ -109,7 +109,10 @@ std::optional<std::map<sstring, sstring>> schema_mutations::cdc_options() const
|
||||
if (_scylla_tables) {
|
||||
auto rs = query::result_set(*_scylla_tables);
|
||||
if (!rs.empty()) {
|
||||
return db::schema_tables::get_map<sstring, sstring>(rs.row(0), "cdc");
|
||||
auto map = db::schema_tables::get_map<sstring, sstring>(rs.row(0), "cdc");
|
||||
if (map && !map->empty()) {
|
||||
return map;
|
||||
}
|
||||
}
|
||||
}
|
||||
return { };
|
||||
|
||||
2
seastar
2
seastar
Submodule seastar updated: 6f0ef32514...8e236efda9
@@ -93,6 +93,7 @@ void migration_manager::init_messaging_service()
|
||||
|
||||
_feature_listeners.push_back(ss.cluster_supports_view_virtual_columns().when_enabled(update_schema));
|
||||
_feature_listeners.push_back(ss.cluster_supports_digest_insensitive_to_expiry().when_enabled(update_schema));
|
||||
_feature_listeners.push_back(ss.cluster_supports_cdc().when_enabled(update_schema));
|
||||
|
||||
auto& ms = netw::get_local_messaging_service();
|
||||
ms.register_definitions_update([this] (const rpc::client_info& cinfo, std::vector<frozen_mutation> fm, rpc::optional<std::vector<canonical_mutation>> cm) {
|
||||
@@ -311,7 +312,8 @@ future<> migration_manager::merge_schema_from(netw::messaging_service::msg_addr
|
||||
try {
|
||||
for (const auto& cm : canonical_mutations) {
|
||||
auto& tbl = db.find_column_family(cm.column_family_id());
|
||||
mutations.emplace_back(cm.to_mutation(tbl.schema()));
|
||||
mutations.emplace_back(cm.to_mutation(
|
||||
tbl.schema()));
|
||||
}
|
||||
} catch (no_such_column_family& e) {
|
||||
mlogger.error("Error while applying schema mutations from {}: {}", src, e);
|
||||
@@ -902,8 +904,9 @@ future<> migration_manager::announce(std::vector<mutation> mutations, bool annou
|
||||
future<> migration_manager::push_schema_mutation(const gms::inet_address& endpoint, const std::vector<mutation>& schema)
|
||||
{
|
||||
netw::messaging_service::msg_addr id{endpoint, 0};
|
||||
auto fm = std::vector<frozen_mutation>(schema.begin(), schema.end());
|
||||
auto cm = std::vector<canonical_mutation>(schema.begin(), schema.end());
|
||||
auto adjusted_schema = db::schema_tables::adjust_schema_for_schema_features(schema, get_local_storage_service().cluster_schema_features());
|
||||
auto fm = std::vector<frozen_mutation>(adjusted_schema.begin(), adjusted_schema.end());
|
||||
auto cm = std::vector<canonical_mutation>(adjusted_schema.begin(), adjusted_schema.end());
|
||||
return netw::get_local_messaging_service().send_definitions_update(id, std::move(fm), std::move(cm));
|
||||
}
|
||||
|
||||
|
||||
@@ -706,7 +706,9 @@ static future<std::optional<utils::UUID>> sleep_and_restart() {
|
||||
* nodes have seen the most recent commit. Otherwise, return null.
|
||||
*/
|
||||
future<utils::UUID> paxos_response_handler::begin_and_repair_paxos(client_state& cs, unsigned& contentions, bool is_write) {
|
||||
_proxy->get_db().local().get_config().check_experimental("Paxos");
|
||||
if (!_proxy->get_db().local().get_config().check_experimental(db::experimental_features_t::LWT)) {
|
||||
throw std::runtime_error("Paxos is currently disabled. Start Scylla with --experimental-features=lwt to enable.");
|
||||
}
|
||||
return do_with(api::timestamp_type(0), shared_from_this(), [this, &cs, &contentions, is_write]
|
||||
(api::timestamp_type& min_timestamp_micros_to_use, shared_ptr<paxos_response_handler>& prh) {
|
||||
return repeat_until_value([this, &contentions, &cs, &min_timestamp_micros_to_use, is_write] {
|
||||
@@ -1883,8 +1885,9 @@ storage_proxy::get_paxos_participants(const sstring& ks_name, const dht::token &
|
||||
});
|
||||
pending_endpoints.erase(itend, pending_endpoints.end());
|
||||
|
||||
size_t participants = pending_endpoints.size() + natural_endpoints.size();
|
||||
size_t required_participants = db::quorum_for(ks) + pending_endpoints.size();
|
||||
const size_t participants = pending_endpoints.size() + natural_endpoints.size();
|
||||
const size_t quorum_size = natural_endpoints.size() / 2 + 1;
|
||||
const size_t required_participants = quorum_size + pending_endpoints.size();
|
||||
|
||||
std::vector<gms::inet_address> live_endpoints;
|
||||
live_endpoints.reserve(participants);
|
||||
|
||||
@@ -344,12 +344,11 @@ std::set<sstring> storage_service::get_config_supported_features_set() {
|
||||
// This should only be true in tests (see cql_test_env.cc:storage_service_for_tests)
|
||||
auto& db = service::get_local_storage_service().db();
|
||||
if (db.local_is_initialized()) {
|
||||
auto& config = service::get_local_storage_service().db().local().get_config();
|
||||
auto& config = db.local().get_config();
|
||||
if (config.enable_sstables_mc_format()) {
|
||||
features.insert(MC_SSTABLE_FEATURE);
|
||||
}
|
||||
if (config.experimental()) {
|
||||
// push additional experimental features
|
||||
if (config.check_experimental(db::experimental_features_t::CDC)) {
|
||||
features.insert(CDC_FEATURE);
|
||||
}
|
||||
}
|
||||
@@ -3533,6 +3532,7 @@ db::schema_features storage_service::cluster_schema_features() const {
|
||||
f.set_if<db::schema_feature::VIEW_VIRTUAL_COLUMNS>(bool(_view_virtual_columns));
|
||||
f.set_if<db::schema_feature::DIGEST_INSENSITIVE_TO_EXPIRY>(bool(_digest_insensitive_to_expiry));
|
||||
f.set_if<db::schema_feature::COMPUTED_COLUMNS>(bool(_computed_columns));
|
||||
f.set_if<db::schema_feature::CDC_OPTIONS>(bool(_cdc_feature));
|
||||
return f;
|
||||
}
|
||||
|
||||
|
||||
@@ -2341,8 +2341,8 @@ public:
|
||||
return bool(_mc_sstable_feature);
|
||||
}
|
||||
|
||||
bool cluster_supports_cdc() const {
|
||||
return bool(_cdc_feature);
|
||||
const gms::feature& cluster_supports_cdc() const {
|
||||
return _cdc_feature;
|
||||
}
|
||||
|
||||
bool cluster_supports_row_level_repair() const {
|
||||
|
||||
@@ -2699,7 +2699,7 @@ entry_descriptor entry_descriptor::make_descriptor(sstring sstdir, sstring fname
|
||||
static std::regex la_mc("(la|mc)-(\\d+)-(\\w+)-(.*)");
|
||||
static std::regex ka("(\\w+)-(\\w+)-ka-(\\d+)-(.*)");
|
||||
|
||||
static std::regex dir(".*/([^/]*)/(\\w+)-[\\da-fA-F]+(?:/staging|/upload|/snapshots/[^/]+)?/?");
|
||||
static std::regex dir(".*/([^/]*)/([^/]+)-[\\da-fA-F]+(?:/staging|/upload|/snapshots/[^/]+)?/?");
|
||||
|
||||
std::smatch match;
|
||||
|
||||
|
||||
8
table.cc
8
table.cc
@@ -292,7 +292,7 @@ create_single_key_sstable_reader(column_family* cf,
|
||||
filter_sstable_for_reader(sstables->select(pr), *cf, schema, pr, key, slice)
|
||||
| boost::adaptors::transformed([&] (const sstables::shared_sstable& sstable) {
|
||||
tracing::trace(trace_state, "Reading key {} from sstable {}", pr, seastar::value_of([&sstable] { return sstable->get_filename(); }));
|
||||
return sstable->read_row_flat(schema, pr.start()->value(), slice, pc, resource_tracker, std::move(trace_state), fwd);
|
||||
return sstable->read_row_flat(schema, pr.start()->value(), slice, pc, resource_tracker, trace_state, fwd);
|
||||
})
|
||||
);
|
||||
if (readers.empty()) {
|
||||
@@ -315,7 +315,7 @@ flat_mutation_reader make_range_sstable_reader(schema_ptr s,
|
||||
{
|
||||
auto reader_factory_fn = [s, &slice, &pc, resource_tracker, trace_state, fwd, fwd_mr, &monitor_generator]
|
||||
(sstables::shared_sstable& sst, const dht::partition_range& pr) mutable {
|
||||
return sst->read_range_rows_flat(s, pr, slice, pc, resource_tracker, std::move(trace_state), fwd, fwd_mr, monitor_generator(sst));
|
||||
return sst->read_range_rows_flat(s, pr, slice, pc, resource_tracker, trace_state, fwd, fwd_mr, monitor_generator(sst));
|
||||
};
|
||||
return make_combined_reader(s, std::make_unique<incremental_reader_selector>(s,
|
||||
std::move(sstables),
|
||||
@@ -587,7 +587,7 @@ flat_mutation_reader make_local_shard_sstable_reader(schema_ptr s,
|
||||
auto reader_factory_fn = [s, &slice, &pc, resource_tracker, trace_state, fwd, fwd_mr, &monitor_generator]
|
||||
(sstables::shared_sstable& sst, const dht::partition_range& pr) mutable {
|
||||
flat_mutation_reader reader = sst->read_range_rows_flat(s, pr, slice, pc,
|
||||
resource_tracker, std::move(trace_state), fwd, fwd_mr, monitor_generator(sst));
|
||||
resource_tracker, trace_state, fwd, fwd_mr, monitor_generator(sst));
|
||||
if (sst->is_shared()) {
|
||||
using sig = bool (&)(const dht::decorated_key&);
|
||||
reader = make_filtering_reader(std::move(reader), sig(belongs_to_current_shard));
|
||||
@@ -2543,7 +2543,7 @@ future<row_locker::lock_holder> table::do_push_view_replica_updates(const schema
|
||||
std::move(slice),
|
||||
std::move(m),
|
||||
[base, views = std::move(views), lock = std::move(lock), this, timeout, source = std::move(source), &io_priority] (auto& pk, auto& slice, auto& m) mutable {
|
||||
auto reader = source.make_reader(base, pk, slice, io_priority);
|
||||
auto reader = source.make_reader(base, pk, slice, io_priority, nullptr, streamed_mutation::forwarding::no, mutation_reader::forwarding::no);
|
||||
return this->generate_and_propagate_view_updates(base, std::move(views), std::move(m), std::move(reader)).then([lock = std::move(lock)] () mutable {
|
||||
// return the local partition/row lock we have taken so it
|
||||
// remains locked until the caller is done modifying this
|
||||
|
||||
9
test.py
9
test.py
@@ -131,6 +131,7 @@ boost_tests = [
|
||||
'data_listeners_test',
|
||||
'truncation_migration_test',
|
||||
'like_matcher_test',
|
||||
'enum_option_test',
|
||||
]
|
||||
|
||||
other_tests = [
|
||||
@@ -265,7 +266,7 @@ if __name__ == "__main__":
|
||||
env['UBSAN_OPTIONS'] = 'print_stacktrace=1'
|
||||
env['BOOST_TEST_CATCH_SYSTEM_ERRORS'] = 'no'
|
||||
|
||||
def run_test(path, type, exec_args):
|
||||
def run_test(path, repeat, type, exec_args):
|
||||
boost_args = []
|
||||
# avoid modifying in-place, it will change test_to_run
|
||||
exec_args = exec_args + '--collectd 0'.split()
|
||||
@@ -274,7 +275,7 @@ if __name__ == "__main__":
|
||||
mode = 'release'
|
||||
if path.startswith(os.path.join('build', 'debug')):
|
||||
mode = 'debug'
|
||||
xmlout = (args.jenkins + "." + mode + "." + os.path.basename(path.split()[0]) + ".boost.xml")
|
||||
xmlout = (args.jenkins + "." + mode + "." + os.path.basename(path.split()[0]) + "." + str(repeat) + ".boost.xml")
|
||||
boost_args += ['--report_level=no', '--logger=HRF,test_suite:XML,test_suite,' + xmlout]
|
||||
if type == 'boost':
|
||||
boost_args += ['--']
|
||||
@@ -312,8 +313,8 @@ if __name__ == "__main__":
|
||||
path = test[0]
|
||||
test_type = test[1]
|
||||
exec_args = test[2] if len(test) >= 3 else []
|
||||
for _ in range(args.repeat):
|
||||
futures.append(executor.submit(run_test, path, test_type, exec_args))
|
||||
for repeat in range(args.repeat):
|
||||
futures.append(executor.submit(run_test, path, repeat, test_type, exec_args))
|
||||
|
||||
results = []
|
||||
cookie = len(futures)
|
||||
|
||||
@@ -844,14 +844,20 @@ inline std::basic_ostream<Args...> & operator<<(std::basic_ostream<Args...> & os
|
||||
}
|
||||
}
|
||||
|
||||
namespace {
|
||||
|
||||
void throw_on_error(const sstring& opt, const sstring& msg, std::optional<utils::config_file::value_status> status) {
|
||||
if (status != config::value_status::Invalid) {
|
||||
throw std::invalid_argument(msg + " : " + opt);
|
||||
}
|
||||
}
|
||||
|
||||
} // anonymous namespace
|
||||
|
||||
SEASTAR_TEST_CASE(test_parse_yaml) {
|
||||
config cfg;
|
||||
|
||||
cfg.read_from_yaml(cassandra_conf, [](auto& opt, auto& msg, auto status) {
|
||||
if (status != config::value_status::Invalid) {
|
||||
throw std::invalid_argument(msg + " : " + opt);
|
||||
}
|
||||
});
|
||||
cfg.read_from_yaml(cassandra_conf, throw_on_error);
|
||||
|
||||
BOOST_CHECK_EQUAL(cfg.cluster_name(), "Test Cluster");
|
||||
BOOST_CHECK_EQUAL(cfg.cluster_name.is_set(), true);
|
||||
@@ -917,3 +923,78 @@ SEASTAR_TEST_CASE(test_parse_broken) {
|
||||
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
using ef = experimental_features_t;
|
||||
using features = std::vector<enum_option<ef>>;
|
||||
|
||||
SEASTAR_TEST_CASE(test_parse_experimental_features_cdc) {
|
||||
config cfg;
|
||||
cfg.read_from_yaml("experimental_features:\n - cdc\n", throw_on_error);
|
||||
BOOST_CHECK_EQUAL(cfg.experimental_features(), features{ef::CDC});
|
||||
BOOST_CHECK(cfg.check_experimental(ef::CDC));
|
||||
BOOST_CHECK(!cfg.check_experimental(ef::LWT));
|
||||
BOOST_CHECK(!cfg.check_experimental(ef::UDF));
|
||||
return make_ready_future();
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_parse_experimental_features_lwt) {
|
||||
config cfg;
|
||||
cfg.read_from_yaml("experimental_features:\n - lwt\n", throw_on_error);
|
||||
BOOST_CHECK_EQUAL(cfg.experimental_features(), features{ef::LWT});
|
||||
BOOST_CHECK(!cfg.check_experimental(ef::CDC));
|
||||
BOOST_CHECK(cfg.check_experimental(ef::LWT));
|
||||
BOOST_CHECK(!cfg.check_experimental(ef::UDF));
|
||||
return make_ready_future();
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_parse_experimental_features_udf) {
|
||||
config cfg;
|
||||
cfg.read_from_yaml("experimental_features:\n - udf\n", throw_on_error);
|
||||
BOOST_CHECK_EQUAL(cfg.experimental_features(), features{ef::UDF});
|
||||
BOOST_CHECK(!cfg.check_experimental(ef::CDC));
|
||||
BOOST_CHECK(!cfg.check_experimental(ef::LWT));
|
||||
BOOST_CHECK(cfg.check_experimental(ef::UDF));
|
||||
return make_ready_future();
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_parse_experimental_features_multiple) {
|
||||
config cfg;
|
||||
cfg.read_from_yaml("experimental_features:\n - cdc\n - lwt\n - cdc\n", throw_on_error);
|
||||
BOOST_CHECK_EQUAL(cfg.experimental_features(), (features{ef::CDC, ef::LWT, ef::CDC}));
|
||||
BOOST_CHECK(cfg.check_experimental(ef::CDC));
|
||||
BOOST_CHECK(cfg.check_experimental(ef::LWT));
|
||||
BOOST_CHECK(!cfg.check_experimental(ef::UDF));
|
||||
return make_ready_future();
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_parse_experimental_features_invalid) {
|
||||
config cfg;
|
||||
using value_status = utils::config_file::value_status;
|
||||
cfg.read_from_yaml("experimental_features:\n - invalidoptiontvaluedonotuse\n",
|
||||
[&cfg] (const sstring& opt, const sstring& msg, std::optional<value_status> status) {
|
||||
BOOST_REQUIRE_EQUAL(opt, "experimental_features");
|
||||
BOOST_REQUIRE_NE(msg.find("line 2, column 7"), msg.npos);
|
||||
BOOST_CHECK(!cfg.check_experimental(ef::CDC));
|
||||
BOOST_CHECK(!cfg.check_experimental(ef::LWT));
|
||||
BOOST_CHECK(!cfg.check_experimental(ef::UDF));
|
||||
});
|
||||
return make_ready_future();
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_parse_experimental_true) {
|
||||
config cfg;
|
||||
cfg.read_from_yaml("experimental: true", throw_on_error);
|
||||
BOOST_CHECK(cfg.check_experimental(ef::CDC));
|
||||
BOOST_CHECK(cfg.check_experimental(ef::LWT));
|
||||
BOOST_CHECK(cfg.check_experimental(ef::UDF));
|
||||
return make_ready_future();
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_parse_experimental_false) {
|
||||
config cfg;
|
||||
cfg.read_from_yaml("experimental: false", throw_on_error);
|
||||
BOOST_CHECK(!cfg.check_experimental(ef::CDC));
|
||||
BOOST_CHECK(!cfg.check_experimental(ef::LWT));
|
||||
BOOST_CHECK(!cfg.check_experimental(ef::UDF));
|
||||
return make_ready_future();
|
||||
}
|
||||
|
||||
@@ -4070,6 +4070,8 @@ SEASTAR_TEST_CASE(test_like_operator_on_clustering_key) {
|
||||
require_rows(e, "select s from t where s like '%c' allow filtering", {{T("abc")}});
|
||||
cquery_nofail(e, "insert into t (p, s) values (2, 'acc')");
|
||||
require_rows(e, "select s from t where s like '%c' allow filtering", {{T("abc")}, {T("acc")}});
|
||||
cquery_nofail(e, "insert into t (p, s) values (2, 'acd')");
|
||||
require_rows(e, "select s from t where p = 2 and s like '%c' allow filtering", {{T("acc")}});
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -352,7 +352,10 @@ public:
|
||||
cfg->view_hints_directory.set(data_dir_path + "/view_hints.dir");
|
||||
cfg->num_tokens.set(256);
|
||||
cfg->ring_delay_ms.set(500);
|
||||
cfg->experimental.set(true);
|
||||
auto features = cfg->experimental_features();
|
||||
features.emplace_back(db::experimental_features_t::CDC);
|
||||
features.emplace_back(db::experimental_features_t::LWT);
|
||||
cfg->experimental_features(features);
|
||||
cfg->shutdown_announce_in_ms.set(0);
|
||||
cfg->broadcast_to_all_shards().get();
|
||||
create_directories((data_dir_path + "/system").c_str());
|
||||
|
||||
175
tests/enum_option_test.cc
Normal file
175
tests/enum_option_test.cc
Normal file
@@ -0,0 +1,175 @@
|
||||
/*
|
||||
* Copyright (C) 2019 ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* This file is part of Scylla.
|
||||
*
|
||||
* Scylla is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Affero General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* Scylla is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#define BOOST_TEST_MODULE core
|
||||
|
||||
#include <boost/test/unit_test.hpp>
|
||||
|
||||
#include <array>
|
||||
#include <boost/program_options.hpp>
|
||||
#include <map>
|
||||
#include <set>
|
||||
#include <sstream>
|
||||
#include <string>
|
||||
#include <unordered_map>
|
||||
|
||||
#include "utils/enum_option.hh"
|
||||
|
||||
namespace po = boost::program_options;
|
||||
|
||||
namespace {
|
||||
|
||||
struct days {
|
||||
enum enumeration { Mo, Tu, We, Th, Fr, Sa, Su };
|
||||
static std::unordered_map<std::string, enumeration> map() {
|
||||
return {{"Mon", Mo}, {"Tue", Tu}, {"Wed", We}, {"Thu", Th}, {"Fri", Fr}, {"Sat", Sa}, {"Sun", Su}};
|
||||
}
|
||||
};
|
||||
|
||||
template <typename T>
|
||||
enum_option<T> parse(const char* value) {
|
||||
po::options_description desc("Allowed options");
|
||||
desc.add_options()("opt", po::value<enum_option<T>>(), "Option");
|
||||
po::variables_map vm;
|
||||
const char* argv[] = {"$0", "--opt", value};
|
||||
po::store(po::parse_command_line(3, argv, desc), vm);
|
||||
return vm["opt"].as<enum_option<T>>();
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
std::string format(typename T::enumeration d) {
|
||||
std::ostringstream os;
|
||||
os << enum_option<T>(d);
|
||||
return os.str();
|
||||
}
|
||||
|
||||
} // anonymous namespace
|
||||
|
||||
BOOST_AUTO_TEST_CASE(test_parsing) {
|
||||
BOOST_CHECK_EQUAL(parse<days>("Sun"), days::Su);
|
||||
BOOST_CHECK_EQUAL(parse<days>("Mon"), days::Mo);
|
||||
BOOST_CHECK_EQUAL(parse<days>("Tue"), days::Tu);
|
||||
BOOST_CHECK_EQUAL(parse<days>("Wed"), days::We);
|
||||
BOOST_CHECK_EQUAL(parse<days>("Thu"), days::Th);
|
||||
BOOST_CHECK_EQUAL(parse<days>("Fri"), days::Fr);
|
||||
BOOST_CHECK_EQUAL(parse<days>("Sat"), days::Sa);
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_CASE(test_parsing_error) {
|
||||
BOOST_REQUIRE_THROW(parse<days>("Sunday"), po::invalid_option_value);
|
||||
BOOST_REQUIRE_THROW(parse<days>(""), po::invalid_option_value);
|
||||
BOOST_REQUIRE_THROW(parse<days>(" "), po::invalid_option_value);
|
||||
BOOST_REQUIRE_THROW(parse<days>(" Sun"), po::invalid_option_value);
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_CASE(test_formatting) {
|
||||
BOOST_CHECK_EQUAL(format<days>(days::Mo), "Mon");
|
||||
BOOST_CHECK_EQUAL(format<days>(days::Tu), "Tue");
|
||||
BOOST_CHECK_EQUAL(format<days>(days::We), "Wed");
|
||||
BOOST_CHECK_EQUAL(format<days>(days::Th), "Thu");
|
||||
BOOST_CHECK_EQUAL(format<days>(days::Fr), "Fri");
|
||||
BOOST_CHECK_EQUAL(format<days>(days::Sa), "Sat");
|
||||
BOOST_CHECK_EQUAL(format<days>(days::Su), "Sun");
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_CASE(test_formatting_unknown) {
|
||||
BOOST_CHECK_EQUAL(format<days>(static_cast<days::enumeration>(77)), "?unknown");
|
||||
}
|
||||
|
||||
namespace {
|
||||
|
||||
struct names {
|
||||
enum enumeration { John, Jane, Jim };
|
||||
static std::map<std::string, enumeration> map() {
|
||||
return {{"John", John}, {"Jane", Jane}, {"James", Jim}};
|
||||
}
|
||||
};
|
||||
|
||||
} // anonymous namespace
|
||||
|
||||
BOOST_AUTO_TEST_CASE(test_ordered_map) {
|
||||
BOOST_CHECK_EQUAL(parse<names>("James"), names::Jim);
|
||||
BOOST_CHECK_EQUAL(format<names>(names::Jim), "James");
|
||||
BOOST_CHECK_EQUAL(parse<names>("John"), names::John);
|
||||
BOOST_CHECK_EQUAL(format<names>(names::John), "John");
|
||||
BOOST_CHECK_EQUAL(parse<names>("Jane"), names::Jane);
|
||||
BOOST_CHECK_EQUAL(format<names>(names::Jane), "Jane");
|
||||
BOOST_CHECK_THROW(parse<names>("Jimbo"), po::invalid_option_value);
|
||||
BOOST_CHECK_EQUAL(format<names>(static_cast<names::enumeration>(77)), "?unknown");
|
||||
}
|
||||
|
||||
namespace {
|
||||
|
||||
struct cities {
|
||||
enum enumeration { SF, TO, NY };
|
||||
static std::unordered_map<std::string, enumeration> map() {
|
||||
return {
|
||||
{"SanFrancisco", SF}, {"SF", SF}, {"SFO", SF}, {"Frisco", SF},
|
||||
{"Toronto", TO}, {"TO", TO}, {"YYZ", TO}, {"TheSix", TO},
|
||||
{"NewYork", NY}, {"NY", NY}, {"NYC", NY}, {"BigApple", NY},
|
||||
};
|
||||
}
|
||||
};
|
||||
|
||||
} // anonymous namespace
|
||||
|
||||
BOOST_AUTO_TEST_CASE(test_multiple_parse) {
|
||||
BOOST_CHECK_EQUAL(parse<cities>("SanFrancisco"), cities::SF);
|
||||
BOOST_CHECK_EQUAL(parse<cities>("SF"), cities::SF);
|
||||
BOOST_CHECK_EQUAL(parse<cities>("SFO"), cities::SF);
|
||||
BOOST_CHECK_EQUAL(parse<cities>("Frisco"), cities::SF);
|
||||
BOOST_CHECK_EQUAL(parse<cities>("Toronto"), cities::TO);
|
||||
BOOST_CHECK_EQUAL(parse<cities>("TO"), cities::TO);
|
||||
BOOST_CHECK_EQUAL(parse<cities>("YYZ"), cities::TO);
|
||||
BOOST_CHECK_EQUAL(parse<cities>("TheSix"), cities::TO);
|
||||
BOOST_CHECK_EQUAL(parse<cities>("NewYork"), cities::NY);
|
||||
BOOST_CHECK_EQUAL(parse<cities>("NY"), cities::NY);
|
||||
BOOST_CHECK_EQUAL(parse<cities>("NYC"), cities::NY);
|
||||
BOOST_CHECK_EQUAL(parse<cities>("BigApple"), cities::NY);
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_CASE(test_multiple_format) {
|
||||
BOOST_CHECK((std::set<std::string>{"SanFrancisco", "SF", "SFO", "Frisco"}).count(format<cities>(cities::SF)));
|
||||
BOOST_CHECK((std::set<std::string>{"Toronto", "TO", "YYZ", "TheSix"}).count(format<cities>(cities::TO)));
|
||||
BOOST_CHECK((std::set<std::string>{"NewYork", "NY", "NYC", "BigApple"}).count(format<cities>(cities::NY)));
|
||||
}
|
||||
|
||||
namespace {
|
||||
|
||||
struct numbers {
|
||||
enum enumeration { ONE, TWO };
|
||||
static std::unordered_map<int, enumeration> map() {
|
||||
return {{1, ONE}, {2, TWO}};
|
||||
}
|
||||
};
|
||||
|
||||
} // anonymous namespace
|
||||
|
||||
BOOST_AUTO_TEST_CASE(test_non_string) {
|
||||
BOOST_CHECK_EQUAL(parse<numbers>("1"), numbers::ONE);
|
||||
BOOST_CHECK_EQUAL(parse<numbers>("2"), numbers::TWO);
|
||||
BOOST_CHECK_THROW(parse<numbers>("3"), po::invalid_option_value);
|
||||
BOOST_CHECK_THROW(parse<numbers>("xx"), po::invalid_option_value);
|
||||
BOOST_CHECK_THROW(parse<numbers>(""), po::invalid_option_value);
|
||||
BOOST_CHECK_EQUAL(format<numbers>(numbers::ONE), "1");
|
||||
BOOST_CHECK_EQUAL(format<numbers>(numbers::TWO), "2");
|
||||
BOOST_CHECK_EQUAL(format<numbers>(static_cast<numbers::enumeration>(77)), "?unknown");
|
||||
}
|
||||
@@ -1320,6 +1320,104 @@ SEASTAR_THREAD_TEST_CASE(test_mutation_upgrade_type_change) {
|
||||
assert_that(m).is_equal_to(m2);
|
||||
}
|
||||
|
||||
// This test checks the behavior of row_marker::{is_live, is_dead, compact_and_expire}. Those functions have some
|
||||
// duplicated logic that decides if a row is expired, and this test verifies that they behave the same with respect
|
||||
// to TTL.
|
||||
SEASTAR_THREAD_TEST_CASE(test_row_marker_expiry) {
|
||||
can_gc_fn never_gc = [] (tombstone) { return false; };
|
||||
|
||||
auto must_be_alive = [&] (row_marker mark, gc_clock::time_point t) {
|
||||
BOOST_TEST_MESSAGE(format("must_be_alive({}, {})", mark, t));
|
||||
BOOST_REQUIRE(mark.is_live(tombstone(), t));
|
||||
BOOST_REQUIRE(mark.is_missing() || !mark.is_dead(t));
|
||||
BOOST_REQUIRE(mark.compact_and_expire(tombstone(), t, never_gc, gc_clock::time_point()));
|
||||
};
|
||||
|
||||
auto must_be_dead = [&] (row_marker mark, gc_clock::time_point t) {
|
||||
BOOST_TEST_MESSAGE(format("must_be_dead({}, {})", mark, t));
|
||||
BOOST_REQUIRE(!mark.is_live(tombstone(), t));
|
||||
BOOST_REQUIRE(mark.is_missing() || mark.is_dead(t));
|
||||
BOOST_REQUIRE(!mark.compact_and_expire(tombstone(), t, never_gc, gc_clock::time_point()));
|
||||
};
|
||||
|
||||
const auto timestamp = api::timestamp_type(1);
|
||||
const auto t0 = gc_clock::now();
|
||||
const auto t1 = t0 + 1s;
|
||||
const auto t2 = t0 + 2s;
|
||||
const auto t3 = t0 + 3s;
|
||||
|
||||
// Without timestamp the marker is missing (doesn't exist)
|
||||
const row_marker m1;
|
||||
must_be_dead(m1, t0);
|
||||
must_be_dead(m1, t1);
|
||||
must_be_dead(m1, t2);
|
||||
must_be_dead(m1, t3);
|
||||
|
||||
// With timestamp and without ttl, a row_marker is always alive
|
||||
const row_marker m2(timestamp);
|
||||
must_be_alive(m2, t0);
|
||||
must_be_alive(m2, t1);
|
||||
must_be_alive(m2, t2);
|
||||
must_be_alive(m2, t3);
|
||||
|
||||
// A row_marker becomes dead exactly at the moment of expiry
|
||||
// Reproduces #4263, #5290
|
||||
const auto ttl = 1s;
|
||||
const row_marker m3(timestamp, ttl, t2);
|
||||
must_be_alive(m3, t0);
|
||||
must_be_alive(m3, t1);
|
||||
must_be_dead(m3, t2);
|
||||
must_be_dead(m3, t3);
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_querying_expired_rows) {
|
||||
auto s = schema_builder("ks", "cf")
|
||||
.with_column("pk", bytes_type, column_kind::partition_key)
|
||||
.with_column("ck", bytes_type, column_kind::clustering_key)
|
||||
.build();
|
||||
|
||||
auto pk = partition_key::from_singular(*s, data_value(bytes("key1")));
|
||||
auto ckey1 = clustering_key::from_singular(*s, data_value(bytes("A")));
|
||||
auto ckey2 = clustering_key::from_singular(*s, data_value(bytes("B")));
|
||||
auto ckey3 = clustering_key::from_singular(*s, data_value(bytes("C")));
|
||||
|
||||
auto ttl = 1s;
|
||||
auto t0 = gc_clock::now();
|
||||
auto t1 = t0 + 1s;
|
||||
auto t2 = t0 + 2s;
|
||||
auto t3 = t0 + 3s;
|
||||
|
||||
auto results_at_time = [s] (const mutation& m, gc_clock::time_point t) {
|
||||
auto slice = partition_slice_builder(*s)
|
||||
.without_partition_key_columns()
|
||||
.build();
|
||||
auto opts = query::result_options{query::result_request::result_and_digest, query::digest_algorithm::xxHash};
|
||||
return query::result_set::from_raw_result(s, slice, m.query(slice, opts, t));
|
||||
};
|
||||
|
||||
mutation m(s, pk);
|
||||
m.partition().clustered_row(*m.schema(), ckey1).apply(row_marker(api::new_timestamp(), ttl, t1));
|
||||
m.partition().clustered_row(*m.schema(), ckey2).apply(row_marker(api::new_timestamp(), ttl, t2));
|
||||
m.partition().clustered_row(*m.schema(), ckey3).apply(row_marker(api::new_timestamp(), ttl, t3));
|
||||
|
||||
assert_that(results_at_time(m, t0))
|
||||
.has_size(3)
|
||||
.has(a_row().with_column("ck", data_value(bytes("A"))))
|
||||
.has(a_row().with_column("ck", data_value(bytes("B"))))
|
||||
.has(a_row().with_column("ck", data_value(bytes("C"))));
|
||||
|
||||
assert_that(results_at_time(m, t1))
|
||||
.has_size(2)
|
||||
.has(a_row().with_column("ck", data_value(bytes("B"))))
|
||||
.has(a_row().with_column("ck", data_value(bytes("C"))));
|
||||
|
||||
assert_that(results_at_time(m, t2))
|
||||
.has_size(1)
|
||||
.has(a_row().with_column("ck", data_value(bytes("C"))));
|
||||
|
||||
assert_that(results_at_time(m, t3)).is_empty();
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_querying_expired_cells) {
|
||||
return seastar::async([] {
|
||||
auto s = schema_builder("ks", "cf")
|
||||
|
||||
@@ -371,10 +371,8 @@ SEASTAR_TEST_CASE(test_merging_does_not_alter_tables_which_didnt_change) {
|
||||
muts2.push_back(db::schema_tables::make_scylla_tables_mutation(s0, api::new_timestamp()));
|
||||
mm.announce(muts2).get();
|
||||
|
||||
// SCYLLA_TABLES have additional columns so announcing its mutation
|
||||
// changes the tables
|
||||
BOOST_REQUIRE(s1 != find_table().schema());
|
||||
BOOST_REQUIRE(legacy_version != find_table().schema()->version());
|
||||
BOOST_REQUIRE(s1 == find_table().schema());
|
||||
BOOST_REQUIRE_EQUAL(legacy_version, find_table().schema()->version());
|
||||
});
|
||||
});
|
||||
}
|
||||
@@ -575,7 +573,7 @@ SEASTAR_TEST_CASE(test_prepared_statement_is_invalidated_by_schema_change) {
|
||||
|
||||
// We don't want schema digest to change between Scylla versions because that results in a schema disagreement
|
||||
// during rolling upgrade.
|
||||
future<> test_schema_digest_does_not_change_with_disabled_features(sstring data_dir, std::set<sstring> disabled_features, std::vector<utils::UUID> expected_digests) {
|
||||
future<> test_schema_digest_does_not_change_with_disabled_features(sstring data_dir, std::set<sstring> disabled_features, std::vector<utils::UUID> expected_digests, std::function<void(cql_test_env& e)> extra_schema_changes) {
|
||||
using namespace db;
|
||||
using namespace db::schema_tables;
|
||||
|
||||
@@ -588,6 +586,7 @@ future<> test_schema_digest_does_not_change_with_disabled_features(sstring data_
|
||||
|
||||
auto db_cfg_ptr = make_shared<db::config>();
|
||||
auto& db_cfg = *db_cfg_ptr;
|
||||
db_cfg.experimental_features({experimental_features_t::UDF}, db::config::config_source::CommandLine);
|
||||
if (regenerate) {
|
||||
db_cfg.data_file_directories({data_dir}, db::config::config_source::CommandLine);
|
||||
} else {
|
||||
@@ -597,7 +596,7 @@ future<> test_schema_digest_does_not_change_with_disabled_features(sstring data_
|
||||
cql_test_config cfg_in(db_cfg_ptr);
|
||||
cfg_in.disabled_features = std::move(disabled_features);
|
||||
|
||||
return do_with_cql_env_thread([regenerate, expected_digests = std::move(expected_digests)](cql_test_env& e) {
|
||||
return do_with_cql_env_thread([regenerate, expected_digests = std::move(expected_digests), extra_schema_changes = std::move(extra_schema_changes)] (cql_test_env& e) {
|
||||
if (regenerate) {
|
||||
// Exercise many different kinds of schema changes.
|
||||
e.execute_cql(
|
||||
@@ -613,6 +612,7 @@ future<> test_schema_digest_does_not_change_with_disabled_features(sstring data_
|
||||
e.execute_cql(
|
||||
"create keyspace tests2 with replication = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };").get();
|
||||
e.execute_cql("drop keyspace tests2;").get();
|
||||
extra_schema_changes(e);
|
||||
}
|
||||
|
||||
auto expect_digest = [&] (schema_features sf, utils::UUID expected) {
|
||||
@@ -673,7 +673,7 @@ SEASTAR_TEST_CASE(test_schema_digest_does_not_change) {
|
||||
utils::UUID("1d91ad22-ea7c-3e7f-9557-87f0f3bb94d7"),
|
||||
utils::UUID("2dcd4a37-cbb5-399b-b3c9-8eb1398b096b")
|
||||
};
|
||||
return test_schema_digest_does_not_change_with_disabled_features("./tests/sstables/schema_digest_test", std::set<sstring>{"COMPUTED_COLUMNS"}, std::move(expected_digests));
|
||||
return test_schema_digest_does_not_change_with_disabled_features("./tests/sstables/schema_digest_test", std::set<sstring>{"COMPUTED_COLUMNS", "CDC"}, std::move(expected_digests), [] (cql_test_env& e) {});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_schema_digest_does_not_change_after_computed_columns) {
|
||||
@@ -688,5 +688,26 @@ SEASTAR_TEST_CASE(test_schema_digest_does_not_change_after_computed_columns) {
|
||||
utils::UUID("d58e5214-516e-3d0b-95b5-01ab71584a8d"),
|
||||
utils::UUID("e1b50bed-2ab8-3759-92c7-1f4288046ae6")
|
||||
};
|
||||
return test_schema_digest_does_not_change_with_disabled_features("./tests/sstables/schema_digest_test_computed_columns", std::set<sstring>{}, std::move(expected_digests));
|
||||
return test_schema_digest_does_not_change_with_disabled_features("./tests/sstables/schema_digest_test_computed_columns", std::set<sstring>{"CDC"}, std::move(expected_digests), [] (cql_test_env& e) {});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_schema_digest_does_not_change_with_cdc_options) {
|
||||
std::vector<utils::UUID> expected_digests{
|
||||
utils::UUID("a1f07f31-59d6-372a-8c94-7ea467354b39"),
|
||||
utils::UUID("524d418d-a2e2-3fc3-bf45-5fb79b33c7e4"),
|
||||
utils::UUID("524d418d-a2e2-3fc3-bf45-5fb79b33c7e4"),
|
||||
utils::UUID("018fccba-8050-3bb9-a0a5-2b3c5f0371fe"),
|
||||
utils::UUID("018fccba-8050-3bb9-a0a5-2b3c5f0371fe"),
|
||||
utils::UUID("58f4254e-cc3b-3d56-8a45-167f9a3ea423"),
|
||||
utils::UUID("48fda4f8-d7b5-3e59-a47a-7397989a9bf8"),
|
||||
utils::UUID("8049bcfe-eb01-3a59-af33-16cef8a34b45"),
|
||||
utils::UUID("2195a821-b2b8-3cb8-a179-2f5042e90841")
|
||||
};
|
||||
return test_schema_digest_does_not_change_with_disabled_features(
|
||||
"./tests/sstables/schema_digest_test_cdc_options",
|
||||
std::set<sstring>{},
|
||||
std::move(expected_digests),
|
||||
[] (cql_test_env& e) {
|
||||
e.execute_cql("create table tests.table_cdc (pk int primary key, c1 int, c2 int) with cdc = {'enabled':'true'};").get();
|
||||
});
|
||||
}
|
||||
|
||||
Binary file not shown.
Binary file not shown.
@@ -0,0 +1 @@
|
||||
547038858
|
||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
@@ -0,0 +1,9 @@
|
||||
CompressionInfo.db
|
||||
Filter.db
|
||||
Data.db
|
||||
Statistics.db
|
||||
TOC.txt
|
||||
Digest.crc32
|
||||
Scylla.db
|
||||
Index.db
|
||||
Summary.db
|
||||
Binary file not shown.
Binary file not shown.
@@ -0,0 +1 @@
|
||||
579241509
|
||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
@@ -0,0 +1,9 @@
|
||||
CompressionInfo.db
|
||||
Filter.db
|
||||
Data.db
|
||||
Statistics.db
|
||||
TOC.txt
|
||||
Digest.crc32
|
||||
Scylla.db
|
||||
Index.db
|
||||
Summary.db
|
||||
Binary file not shown.
Binary file not shown.
@@ -0,0 +1 @@
|
||||
2172984605
|
||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
@@ -0,0 +1,9 @@
|
||||
CompressionInfo.db
|
||||
Filter.db
|
||||
Data.db
|
||||
Statistics.db
|
||||
TOC.txt
|
||||
Digest.crc32
|
||||
Scylla.db
|
||||
Index.db
|
||||
Summary.db
|
||||
Binary file not shown.
Binary file not shown.
@@ -0,0 +1 @@
|
||||
2014320564
|
||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
@@ -0,0 +1,9 @@
|
||||
CompressionInfo.db
|
||||
Filter.db
|
||||
Data.db
|
||||
Statistics.db
|
||||
TOC.txt
|
||||
Digest.crc32
|
||||
Scylla.db
|
||||
Index.db
|
||||
Summary.db
|
||||
Binary file not shown.
Binary file not shown.
@@ -0,0 +1 @@
|
||||
1560009820
|
||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
@@ -0,0 +1,9 @@
|
||||
CompressionInfo.db
|
||||
Filter.db
|
||||
Data.db
|
||||
Statistics.db
|
||||
TOC.txt
|
||||
Digest.crc32
|
||||
Scylla.db
|
||||
Index.db
|
||||
Summary.db
|
||||
Binary file not shown.
Binary file not shown.
@@ -0,0 +1 @@
|
||||
3276779049
|
||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
@@ -0,0 +1,9 @@
|
||||
CompressionInfo.db
|
||||
Filter.db
|
||||
Data.db
|
||||
Statistics.db
|
||||
TOC.txt
|
||||
Digest.crc32
|
||||
Scylla.db
|
||||
Index.db
|
||||
Summary.db
|
||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user