Compare commits
49 Commits
scylla-3.2
...
next-3.2
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d112a230c0 | ||
|
|
4371cb41d0 | ||
|
|
a8b9f94dcb | ||
|
|
77500f9171 | ||
|
|
13328e7253 | ||
|
|
79b58f89f1 | ||
|
|
ba2821ec70 | ||
|
|
d72555e786 | ||
|
|
4c38534f75 | ||
|
|
a092f5d1f4 | ||
|
|
723fd50712 | ||
|
|
89deac7795 | ||
|
|
3843e5233c | ||
|
|
1b3c78480c | ||
|
|
48253eb183 | ||
|
|
5d60522c81 | ||
|
|
63e93110d1 | ||
|
|
83105efba8 | ||
|
|
5840eb602a | ||
|
|
61738999ea | ||
|
|
0b23e7145d | ||
|
|
3374aa20bb | ||
|
|
c4e89ea1b0 | ||
|
|
26d9ce6b98 | ||
|
|
6d1a4e2c0b | ||
|
|
fad143a441 | ||
|
|
bc07b877a5 | ||
|
|
09ad011f98 | ||
|
|
b34973df4e | ||
|
|
d65e2ac6af | ||
|
|
dbf72c72b3 | ||
|
|
b542b9c89a | ||
|
|
c0e493edcc | ||
|
|
88718996ed | ||
|
|
97236a2cee | ||
|
|
6c272b48f5 | ||
|
|
6a8ae87efa | ||
|
|
d24d9d037e | ||
|
|
43766bd453 | ||
|
|
ddd8f9b1d1 | ||
|
|
e3e301906d | ||
|
|
2c822d4c1f | ||
|
|
04f8800b5b | ||
|
|
a72a06d3b7 | ||
|
|
f9b11c9b30 | ||
|
|
798357f656 | ||
|
|
7eb86fbbb4 | ||
|
|
edf431f581 | ||
|
|
3f358c9772 |
@@ -1,7 +1,7 @@
|
||||
#!/bin/sh
|
||||
|
||||
PRODUCT=scylla
|
||||
VERSION=3.2.rc4
|
||||
VERSION=3.2.5
|
||||
|
||||
if test -f version
|
||||
then
|
||||
|
||||
@@ -254,6 +254,9 @@ void set_storage_service(http_context& ctx, routes& r) {
|
||||
if (column_family.empty()) {
|
||||
resp = service::get_local_storage_service().take_snapshot(tag, keynames);
|
||||
} else {
|
||||
if (keynames.empty()) {
|
||||
throw httpd::bad_param_exception("The keyspace of column families must be specified");
|
||||
}
|
||||
if (keynames.size() > 1) {
|
||||
throw httpd::bad_param_exception("Only one keyspace allowed when specifying a column family");
|
||||
}
|
||||
|
||||
@@ -74,7 +74,7 @@ public:
|
||||
options() = default;
|
||||
options(const std::map<sstring, sstring>& map) {
|
||||
if (map.find("enabled") == std::end(map)) {
|
||||
throw exceptions::configuration_exception("Missing enabled CDC option");
|
||||
return;
|
||||
}
|
||||
|
||||
for (auto& p : map) {
|
||||
@@ -92,6 +92,9 @@ public:
|
||||
}
|
||||
}
|
||||
std::map<sstring, sstring> to_map() const {
|
||||
if (!_enabled) {
|
||||
return {};
|
||||
}
|
||||
return {
|
||||
{ "enabled", _enabled ? "true" : "false" },
|
||||
{ "preimage", _preimage ? "true" : "false" },
|
||||
|
||||
@@ -244,7 +244,6 @@ batch_size_fail_threshold_in_kb: 50
|
||||
# experimental_features:
|
||||
# - cdc
|
||||
# - lwt
|
||||
# - udf
|
||||
|
||||
# The directory where hints files are stored if hinted handoff is enabled.
|
||||
# hints_directory: /var/lib/scylla/hints
|
||||
|
||||
@@ -266,7 +266,7 @@ bool column_condition::applies_to(const data_value* cell_value, const query_opti
|
||||
return value.has_value() && is_satisfied_by(operator_type::EQ, *cell_value->type(), *column.type, *cell_value, *value);
|
||||
});
|
||||
} else {
|
||||
return std::any_of(in_values.begin(), in_values.end(), [] (const bytes_opt& value) { return value.has_value() == false; });
|
||||
return std::any_of(in_values.begin(), in_values.end(), [] (const bytes_opt& value) { return !value.has_value() || value->empty(); });
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -61,6 +61,16 @@ make_now_fct() {
|
||||
});
|
||||
}
|
||||
|
||||
static int64_t get_valid_timestamp(const data_value& ts_obj) {
|
||||
auto ts = value_cast<db_clock::time_point>(ts_obj);
|
||||
int64_t ms = ts.time_since_epoch().count();
|
||||
auto nanos_since = utils::UUID_gen::make_nanos_since(ms);
|
||||
if (!utils::UUID_gen::is_valid_nanos_since(nanos_since)) {
|
||||
throw exceptions::server_exception(format("{}: timestamp is out of range. Must be in milliseconds since epoch", ms));
|
||||
}
|
||||
return ms;
|
||||
}
|
||||
|
||||
inline
|
||||
shared_ptr<function>
|
||||
make_min_timeuuid_fct() {
|
||||
@@ -74,8 +84,7 @@ make_min_timeuuid_fct() {
|
||||
if (ts_obj.is_null()) {
|
||||
return {};
|
||||
}
|
||||
auto ts = value_cast<db_clock::time_point>(ts_obj);
|
||||
auto uuid = utils::UUID_gen::min_time_UUID(ts.time_since_epoch().count());
|
||||
auto uuid = utils::UUID_gen::min_time_UUID(get_valid_timestamp(ts_obj));
|
||||
return {timeuuid_type->decompose(uuid)};
|
||||
});
|
||||
}
|
||||
@@ -85,7 +94,6 @@ shared_ptr<function>
|
||||
make_max_timeuuid_fct() {
|
||||
return make_native_scalar_function<true>("maxtimeuuid", timeuuid_type, { timestamp_type },
|
||||
[] (cql_serialization_format sf, const std::vector<bytes_opt>& values) -> bytes_opt {
|
||||
// FIXME: should values be a vector<optional<bytes>>?
|
||||
auto& bb = values[0];
|
||||
if (!bb) {
|
||||
return {};
|
||||
@@ -94,12 +102,22 @@ make_max_timeuuid_fct() {
|
||||
if (ts_obj.is_null()) {
|
||||
return {};
|
||||
}
|
||||
auto ts = value_cast<db_clock::time_point>(ts_obj);
|
||||
auto uuid = utils::UUID_gen::max_time_UUID(ts.time_since_epoch().count());
|
||||
auto uuid = utils::UUID_gen::max_time_UUID(get_valid_timestamp(ts_obj));
|
||||
return {timeuuid_type->decompose(uuid)};
|
||||
});
|
||||
}
|
||||
|
||||
inline utils::UUID get_valid_timeuuid(bytes raw) {
|
||||
if (!utils::UUID_gen::is_valid_UUID(raw)) {
|
||||
throw exceptions::server_exception(format("invalid timeuuid: size={}", raw.size()));
|
||||
}
|
||||
auto uuid = utils::UUID_gen::get_UUID(raw);
|
||||
if (!uuid.is_timestamp()) {
|
||||
throw exceptions::server_exception(format("{}: Not a timeuuid: version={}", uuid, uuid.version()));
|
||||
}
|
||||
return uuid;
|
||||
}
|
||||
|
||||
inline
|
||||
shared_ptr<function>
|
||||
make_date_of_fct() {
|
||||
@@ -110,7 +128,7 @@ make_date_of_fct() {
|
||||
if (!bb) {
|
||||
return {};
|
||||
}
|
||||
auto ts = db_clock::time_point(db_clock::duration(UUID_gen::unix_timestamp(UUID_gen::get_UUID(*bb))));
|
||||
auto ts = db_clock::time_point(db_clock::duration(UUID_gen::unix_timestamp(get_valid_timeuuid(*bb))));
|
||||
return {timestamp_type->decompose(ts)};
|
||||
});
|
||||
}
|
||||
@@ -125,7 +143,7 @@ make_unix_timestamp_of_fct() {
|
||||
if (!bb) {
|
||||
return {};
|
||||
}
|
||||
return {long_type->decompose(UUID_gen::unix_timestamp(UUID_gen::get_UUID(*bb)))};
|
||||
return {long_type->decompose(UUID_gen::unix_timestamp(get_valid_timeuuid(*bb)))};
|
||||
});
|
||||
}
|
||||
|
||||
@@ -176,7 +194,7 @@ make_timeuuidtodate_fct() {
|
||||
if (!bb) {
|
||||
return {};
|
||||
}
|
||||
auto ts = db_clock::time_point(db_clock::duration(UUID_gen::unix_timestamp(UUID_gen::get_UUID(*bb))));
|
||||
auto ts = db_clock::time_point(db_clock::duration(UUID_gen::unix_timestamp(get_valid_timeuuid(*bb))));
|
||||
auto to_simple_date = get_castas_fctn(simple_date_type, timestamp_type);
|
||||
return {simple_date_type->decompose(to_simple_date(ts))};
|
||||
});
|
||||
@@ -211,7 +229,7 @@ make_timeuuidtotimestamp_fct() {
|
||||
if (!bb) {
|
||||
return {};
|
||||
}
|
||||
auto ts = db_clock::time_point(db_clock::duration(UUID_gen::unix_timestamp(UUID_gen::get_UUID(*bb))));
|
||||
auto ts = db_clock::time_point(db_clock::duration(UUID_gen::unix_timestamp(get_valid_timeuuid(*bb))));
|
||||
return {timestamp_type->decompose(ts)};
|
||||
});
|
||||
}
|
||||
@@ -245,10 +263,14 @@ make_timeuuidtounixtimestamp_fct() {
|
||||
if (!bb) {
|
||||
return {};
|
||||
}
|
||||
return {long_type->decompose(UUID_gen::unix_timestamp(UUID_gen::get_UUID(*bb)))};
|
||||
return {long_type->decompose(UUID_gen::unix_timestamp(get_valid_timeuuid(*bb)))};
|
||||
});
|
||||
}
|
||||
|
||||
inline bytes time_point_to_long(const data_value& v) {
|
||||
return data_value(get_valid_timestamp(v)).serialize();
|
||||
}
|
||||
|
||||
inline
|
||||
shared_ptr<function>
|
||||
make_timestamptounixtimestamp_fct() {
|
||||
@@ -263,7 +285,7 @@ make_timestamptounixtimestamp_fct() {
|
||||
if (ts_obj.is_null()) {
|
||||
return {};
|
||||
}
|
||||
return {long_type->decompose(ts_obj)};
|
||||
return time_point_to_long(ts_obj);
|
||||
});
|
||||
}
|
||||
|
||||
@@ -282,7 +304,7 @@ make_datetounixtimestamp_fct() {
|
||||
return {};
|
||||
}
|
||||
auto from_simple_date = get_castas_fctn(timestamp_type, simple_date_type);
|
||||
return {long_type->decompose(from_simple_date(simple_date_obj))};
|
||||
return time_point_to_long(from_simple_date(simple_date_obj));
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -390,28 +390,45 @@ std::vector<const column_definition*> statement_restrictions::get_column_defs_fo
|
||||
if (need_filtering()) {
|
||||
auto& sim = db.find_column_family(_schema).get_index_manager();
|
||||
auto [opt_idx, _] = find_idx(sim);
|
||||
auto column_uses_indexing = [&opt_idx] (const column_definition* cdef) {
|
||||
return opt_idx && opt_idx->depends_on(*cdef);
|
||||
auto column_uses_indexing = [&opt_idx] (const column_definition* cdef, ::shared_ptr<single_column_restriction> restr) {
|
||||
return opt_idx && restr && restr->is_supported_by(*opt_idx);
|
||||
};
|
||||
auto single_pk_restrs = dynamic_pointer_cast<single_column_partition_key_restrictions>(_partition_key_restrictions);
|
||||
if (_partition_key_restrictions->needs_filtering(*_schema)) {
|
||||
for (auto&& cdef : _partition_key_restrictions->get_column_defs()) {
|
||||
if (!column_uses_indexing(cdef)) {
|
||||
::shared_ptr<single_column_restriction> restr;
|
||||
if (single_pk_restrs) {
|
||||
auto it = single_pk_restrs->restrictions().find(cdef);
|
||||
if (it != single_pk_restrs->restrictions().end()) {
|
||||
restr = dynamic_pointer_cast<single_column_restriction>(it->second);
|
||||
}
|
||||
}
|
||||
if (!column_uses_indexing(cdef, restr)) {
|
||||
column_defs_for_filtering.emplace_back(cdef);
|
||||
}
|
||||
}
|
||||
}
|
||||
auto single_ck_restrs = dynamic_pointer_cast<single_column_clustering_key_restrictions>(_clustering_columns_restrictions);
|
||||
const bool pk_has_unrestricted_components = _partition_key_restrictions->has_unrestricted_components(*_schema);
|
||||
if (pk_has_unrestricted_components || _clustering_columns_restrictions->needs_filtering(*_schema)) {
|
||||
column_id first_filtering_id = pk_has_unrestricted_components ? 0 : _schema->clustering_key_columns().begin()->id +
|
||||
_clustering_columns_restrictions->num_prefix_columns_that_need_not_be_filtered();
|
||||
for (auto&& cdef : _clustering_columns_restrictions->get_column_defs()) {
|
||||
if (cdef->id >= first_filtering_id && !column_uses_indexing(cdef)) {
|
||||
::shared_ptr<single_column_restriction> restr;
|
||||
if (single_pk_restrs) {
|
||||
auto it = single_ck_restrs->restrictions().find(cdef);
|
||||
if (it != single_ck_restrs->restrictions().end()) {
|
||||
restr = dynamic_pointer_cast<single_column_restriction>(it->second);
|
||||
}
|
||||
}
|
||||
if (cdef->id >= first_filtering_id && !column_uses_indexing(cdef, restr)) {
|
||||
column_defs_for_filtering.emplace_back(cdef);
|
||||
}
|
||||
}
|
||||
}
|
||||
for (auto&& cdef : _nonprimary_key_restrictions->get_column_defs()) {
|
||||
if (!column_uses_indexing(cdef)) {
|
||||
auto restr = dynamic_pointer_cast<single_column_restriction>(_nonprimary_key_restrictions->get_restriction(*cdef));
|
||||
if (!column_uses_indexing(cdef, restr)) {
|
||||
column_defs_for_filtering.emplace_back(cdef);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -92,6 +92,14 @@ public:
|
||||
: abstract_function_selector(fun, std::move(arg_selectors))
|
||||
, _tfun(dynamic_pointer_cast<T>(fun)) {
|
||||
}
|
||||
|
||||
const functions::function_name& name() const {
|
||||
return _tfun->name();
|
||||
}
|
||||
|
||||
virtual sstring assignment_testable_source_context() const override {
|
||||
return format("{}", this->name());
|
||||
}
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
@@ -79,11 +79,6 @@ public:
|
||||
dynamic_pointer_cast<functions::aggregate_function>(func), std::move(arg_selectors))
|
||||
, _aggregate(fun()->new_aggregate()) {
|
||||
}
|
||||
|
||||
virtual sstring assignment_testable_source_context() const override {
|
||||
// FIXME:
|
||||
return "FIXME";
|
||||
}
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
@@ -82,12 +82,6 @@ public:
|
||||
: abstract_function_selector_for<functions::scalar_function>(
|
||||
dynamic_pointer_cast<functions::scalar_function>(std::move(fun)), std::move(arg_selectors)) {
|
||||
}
|
||||
|
||||
virtual sstring assignment_testable_source_context() const override {
|
||||
// FIXME:
|
||||
return "FIXME";
|
||||
}
|
||||
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
@@ -111,7 +111,9 @@ lw_shared_ptr<query::read_command> cas_request::read_command() const {
|
||||
} else {
|
||||
ranges = query::clustering_range::deoverlap(std::move(ranges), clustering_key::tri_compare(*_schema));
|
||||
}
|
||||
query::partition_slice ps(std::move(ranges), *_schema, columns_to_read, update_parameters::options);
|
||||
auto options = update_parameters::options;
|
||||
options.set(query::partition_slice::option::always_return_static_content);
|
||||
query::partition_slice ps(std::move(ranges), *_schema, columns_to_read, options);
|
||||
ps.set_partition_row_limit(max_rows);
|
||||
return make_lw_shared<query::read_command>(_schema->id(), _schema->version(), std::move(ps));
|
||||
}
|
||||
|
||||
@@ -60,7 +60,6 @@ public:
|
||||
static constexpr query::partition_slice::option_set options = query::partition_slice::option_set::of<
|
||||
query::partition_slice::option::send_partition_key,
|
||||
query::partition_slice::option::send_clustering_key,
|
||||
query::partition_slice::option::always_return_static_content,
|
||||
query::partition_slice::option::collections_as_maps>();
|
||||
|
||||
// Holder for data for
|
||||
|
||||
@@ -1316,7 +1316,7 @@ future<db::commitlog::segment_manager::sseg_ptr> db::commitlog::segment_manager:
|
||||
v.emplace_back(iovec{ buf.get_write(), s});
|
||||
m += s;
|
||||
}
|
||||
return f.dma_write(max_size - rem, std::move(v)).then([&rem](size_t s) {
|
||||
return f.dma_write(max_size - rem, std::move(v), service::get_local_commitlog_priority()).then([&rem](size_t s) {
|
||||
rem -= s;
|
||||
return stop_iteration::no;
|
||||
});
|
||||
|
||||
@@ -691,8 +691,9 @@ db::config::config(std::shared_ptr<db::extensions> exts)
|
||||
, shutdown_announce_in_ms(this, "shutdown_announce_in_ms", value_status::Used, 2 * 1000, "Time a node waits after sending gossip shutdown message in milliseconds. Same as -Dcassandra.shutdown_announce_in_ms in cassandra.")
|
||||
, developer_mode(this, "developer_mode", value_status::Used, false, "Relax environment checks. Setting to true can reduce performance and reliability significantly.")
|
||||
, skip_wait_for_gossip_to_settle(this, "skip_wait_for_gossip_to_settle", value_status::Used, -1, "An integer to configure the wait for gossip to settle. -1: wait normally, 0: do not wait at all, n: wait for at most n polls. Same as -Dcassandra.skip_wait_for_gossip_to_settle in cassandra.")
|
||||
, force_gossip_generation(this, "force_gossip_generation", liveness::LiveUpdate, value_status::Used, -1 , "Force gossip to use the generation number provided by user")
|
||||
, experimental(this, "experimental", value_status::Used, false, "Set to true to unlock all experimental features.")
|
||||
, experimental_features(this, "experimental_features", value_status::Used, {}, "Unlock experimental features provided as the option arguments (possible values: 'lwt', 'cdc', 'udf'). Can be repeated.")
|
||||
, experimental_features(this, "experimental_features", value_status::Used, {}, "Unlock experimental features provided as the option arguments (possible values: 'lwt' and 'cdc'). Can be repeated.")
|
||||
, lsa_reclamation_step(this, "lsa_reclamation_step", value_status::Used, 1, "Minimum number of segments to reclaim in a single step")
|
||||
, prometheus_port(this, "prometheus_port", value_status::Used, 9180, "Prometheus port, set to zero to disable")
|
||||
, prometheus_address(this, "prometheus_address", value_status::Used, "0.0.0.0", "Prometheus listening address")
|
||||
@@ -855,7 +856,7 @@ const db::extensions& db::config::extensions() const {
|
||||
std::unordered_map<sstring, db::experimental_features_t::feature> db::experimental_features_t::map() {
|
||||
// We decided against using the construct-on-first-use idiom here:
|
||||
// https://github.com/scylladb/scylla/pull/5369#discussion_r353614807
|
||||
return {{"lwt", LWT}, {"udf", UDF}, {"cdc", CDC}};
|
||||
return {{"lwt", LWT}, {"cdc", CDC}};
|
||||
}
|
||||
|
||||
template struct utils::config_file::named_value<seastar::log_level>;
|
||||
|
||||
@@ -78,7 +78,7 @@ namespace db {
|
||||
|
||||
/// Enumeration of all valid values for the `experimental` config entry.
|
||||
struct experimental_features_t {
|
||||
enum feature { LWT, UDF, CDC };
|
||||
enum feature { LWT, CDC };
|
||||
static std::unordered_map<sstring, feature> map(); // See enum_option.
|
||||
};
|
||||
|
||||
@@ -269,6 +269,7 @@ public:
|
||||
named_value<uint32_t> shutdown_announce_in_ms;
|
||||
named_value<bool> developer_mode;
|
||||
named_value<int32_t> skip_wait_for_gossip_to_settle;
|
||||
named_value<int32_t> force_gossip_generation;
|
||||
named_value<bool> experimental;
|
||||
named_value<std::vector<enum_option<experimental_features_t>>> experimental_features;
|
||||
named_value<size_t> lsa_reclamation_step;
|
||||
|
||||
@@ -405,11 +405,8 @@ future<> manager::end_point_hints_manager::sender::do_send_one_mutation(frozen_m
|
||||
return _proxy.send_to_endpoint(std::move(m), end_point_key(), { }, write_type::SIMPLE, service::allow_hints::no);
|
||||
} else {
|
||||
manager_logger.trace("Endpoints set has changed and {} is no longer a replica. Mutating from scratch...", end_point_key());
|
||||
// FIXME: using 1h as infinite timeout. If a node is down, we should get an
|
||||
// unavailable exception.
|
||||
auto timeout = db::timeout_clock::now() + 1h;
|
||||
//FIXME: Add required frozen_mutation overloads
|
||||
return _proxy.mutate({m.fm.unfreeze(m.s)}, consistency_level::ALL, timeout, nullptr, empty_service_permit());
|
||||
return _proxy.mutate_hint_from_scratch(std::move(m));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@@ -33,12 +33,14 @@ enum class schema_feature {
|
||||
// See https://github.com/scylladb/scylla/issues/4485
|
||||
DIGEST_INSENSITIVE_TO_EXPIRY,
|
||||
COMPUTED_COLUMNS,
|
||||
CDC_OPTIONS,
|
||||
};
|
||||
|
||||
using schema_features = enum_set<super_enum<schema_feature,
|
||||
schema_feature::VIEW_VIRTUAL_COLUMNS,
|
||||
schema_feature::DIGEST_INSENSITIVE_TO_EXPIRY,
|
||||
schema_feature::COMPUTED_COLUMNS
|
||||
schema_feature::COMPUTED_COLUMNS,
|
||||
schema_feature::CDC_OPTIONS
|
||||
>>;
|
||||
|
||||
}
|
||||
|
||||
@@ -294,19 +294,24 @@ schema_ptr tables() {
|
||||
}
|
||||
|
||||
// Holds Scylla-specific table metadata.
|
||||
schema_ptr scylla_tables() {
|
||||
static thread_local auto schema = [] {
|
||||
schema_ptr scylla_tables(schema_features features) {
|
||||
static auto make = [] (bool has_cdc_options) -> schema_ptr {
|
||||
auto id = generate_legacy_id(NAME, SCYLLA_TABLES);
|
||||
return schema_builder(NAME, SCYLLA_TABLES, std::make_optional(id))
|
||||
auto sb = schema_builder(NAME, SCYLLA_TABLES, std::make_optional(id))
|
||||
.with_column("keyspace_name", utf8_type, column_kind::partition_key)
|
||||
.with_column("table_name", utf8_type, column_kind::clustering_key)
|
||||
.with_column("version", uuid_type)
|
||||
.with_column("cdc", map_type_impl::get_instance(utf8_type, utf8_type, false))
|
||||
.set_gc_grace_seconds(schema_gc_grace)
|
||||
.with_version(generate_schema_version(id))
|
||||
.build();
|
||||
}();
|
||||
return schema;
|
||||
.set_gc_grace_seconds(schema_gc_grace);
|
||||
if (has_cdc_options) {
|
||||
sb.with_column("cdc", map_type_impl::get_instance(utf8_type, utf8_type, false));
|
||||
sb.with_version(generate_schema_version(id, 1));
|
||||
} else {
|
||||
sb.with_version(generate_schema_version(id));
|
||||
}
|
||||
return sb.build();
|
||||
};
|
||||
static thread_local schema_ptr schemas[2] = { make(false), make(true) };
|
||||
return schemas[features.contains(schema_feature::CDC_OPTIONS)];
|
||||
}
|
||||
|
||||
// The "columns" table lists the definitions of all columns in all tables
|
||||
@@ -608,14 +613,28 @@ schema_ptr aggregates() {
|
||||
}
|
||||
#endif
|
||||
|
||||
static
|
||||
mutation
|
||||
redact_columns_for_missing_features(mutation m, schema_features features) {
|
||||
if (features.contains(schema_feature::CDC_OPTIONS)) {
|
||||
return std::move(m);
|
||||
}
|
||||
if (m.schema()->cf_name() != SCYLLA_TABLES) {
|
||||
return std::move(m);
|
||||
}
|
||||
slogger.debug("adjusting schema_tables mutation due to possible in-progress cluster upgrade");
|
||||
m.upgrade(scylla_tables(features));
|
||||
return std::move(m);
|
||||
}
|
||||
|
||||
/**
|
||||
* Read schema from system keyspace and calculate MD5 digest of every row, resulting digest
|
||||
* will be converted into UUID which would act as content-based version of the schema.
|
||||
*/
|
||||
future<utils::UUID> calculate_schema_digest(distributed<service::storage_proxy>& proxy, schema_features features)
|
||||
{
|
||||
auto map = [&proxy] (sstring table) {
|
||||
return db::system_keyspace::query_mutations(proxy, NAME, table).then([&proxy, table] (auto rs) {
|
||||
auto map = [&proxy, features] (sstring table) {
|
||||
return db::system_keyspace::query_mutations(proxy, NAME, table).then([&proxy, table, features] (auto rs) {
|
||||
auto s = proxy.local().get_db().local().find_schema(NAME, table);
|
||||
std::vector<mutation> mutations;
|
||||
for (auto&& p : rs->partitions()) {
|
||||
@@ -624,6 +643,7 @@ future<utils::UUID> calculate_schema_digest(distributed<service::storage_proxy>&
|
||||
if (is_system_keyspace(partition_key)) {
|
||||
continue;
|
||||
}
|
||||
mut = redact_columns_for_missing_features(std::move(mut), features);
|
||||
mutations.emplace_back(std::move(mut));
|
||||
}
|
||||
return mutations;
|
||||
@@ -647,8 +667,8 @@ future<utils::UUID> calculate_schema_digest(distributed<service::storage_proxy>&
|
||||
|
||||
future<std::vector<canonical_mutation>> convert_schema_to_mutations(distributed<service::storage_proxy>& proxy, schema_features features)
|
||||
{
|
||||
auto map = [&proxy] (sstring table) {
|
||||
return db::system_keyspace::query_mutations(proxy, NAME, table).then([&proxy, table] (auto rs) {
|
||||
auto map = [&proxy, features] (sstring table) {
|
||||
return db::system_keyspace::query_mutations(proxy, NAME, table).then([&proxy, table, features] (auto rs) {
|
||||
auto s = proxy.local().get_db().local().find_schema(NAME, table);
|
||||
std::vector<canonical_mutation> results;
|
||||
for (auto&& p : rs->partitions()) {
|
||||
@@ -657,6 +677,7 @@ future<std::vector<canonical_mutation>> convert_schema_to_mutations(distributed<
|
||||
if (is_system_keyspace(partition_key)) {
|
||||
continue;
|
||||
}
|
||||
mut = redact_columns_for_missing_features(std::move(mut), features);
|
||||
results.emplace_back(mut);
|
||||
}
|
||||
return results;
|
||||
@@ -669,6 +690,14 @@ future<std::vector<canonical_mutation>> convert_schema_to_mutations(distributed<
|
||||
return map_reduce(all_table_names(features), map, std::vector<canonical_mutation>{}, reduce);
|
||||
}
|
||||
|
||||
std::vector<mutation>
|
||||
adjust_schema_for_schema_features(std::vector<mutation> schema, schema_features features) {
|
||||
for (auto& m : schema) {
|
||||
m = redact_columns_for_missing_features(m, features);
|
||||
}
|
||||
return std::move(schema);
|
||||
}
|
||||
|
||||
future<schema_result>
|
||||
read_schema_for_keyspaces(distributed<service::storage_proxy>& proxy, const sstring& schema_table_name, const std::set<sstring>& keyspace_names)
|
||||
{
|
||||
@@ -1673,7 +1702,19 @@ mutation make_scylla_tables_mutation(schema_ptr table, api::timestamp_type times
|
||||
auto ckey = clustering_key::from_singular(*s, table->cf_name());
|
||||
mutation m(scylla_tables(), pkey);
|
||||
m.set_clustered_cell(ckey, "version", utils::UUID(table->version()), timestamp);
|
||||
store_map(m, ckey, "cdc", timestamp, table->cdc_options().to_map());
|
||||
auto cdc_options = table->cdc_options().to_map();
|
||||
if (!cdc_options.empty()) {
|
||||
store_map(m, ckey, "cdc", timestamp, cdc_options);
|
||||
} else {
|
||||
// Avoid storing anything for cdc disabled, so we don't end up with
|
||||
// different digests on different nodes due to the other node redacting
|
||||
// the cdc column when the cdc cluster feature is disabled.
|
||||
//
|
||||
// Tombstones are not considered for schema digest, so this is okay (and
|
||||
// needed in order for disabling of cdc to have effect).
|
||||
auto& cdc_cdef = *scylla_tables()->get_column_definition("cdc");
|
||||
m.set_clustered_cell(ckey, cdc_cdef, atomic_cell::make_dead(timestamp, gc_clock::now()));
|
||||
}
|
||||
return m;
|
||||
}
|
||||
|
||||
|
||||
@@ -109,7 +109,7 @@ schema_ptr view_virtual_columns();
|
||||
schema_ptr dropped_columns();
|
||||
schema_ptr indexes();
|
||||
schema_ptr tables();
|
||||
schema_ptr scylla_tables();
|
||||
schema_ptr scylla_tables(schema_features features = schema_features::full());
|
||||
schema_ptr views();
|
||||
schema_ptr computed_columns();
|
||||
|
||||
@@ -154,6 +154,7 @@ future<> save_system_keyspace_schema();
|
||||
future<utils::UUID> calculate_schema_digest(distributed<service::storage_proxy>& proxy, schema_features);
|
||||
|
||||
future<std::vector<canonical_mutation>> convert_schema_to_mutations(distributed<service::storage_proxy>& proxy, schema_features);
|
||||
std::vector<mutation> adjust_schema_for_schema_features(std::vector<mutation> schema, schema_features features);
|
||||
|
||||
future<schema_result_value_type>
|
||||
read_schema_partition_for_keyspace(distributed<service::storage_proxy>& proxy, const sstring& schema_table_name, const sstring& keyspace_name);
|
||||
|
||||
@@ -104,10 +104,10 @@ api::timestamp_type schema_creation_timestamp() {
|
||||
// FIXME: Make automatic by calculating from schema structure.
|
||||
static const uint16_t version_sequence_number = 1;
|
||||
|
||||
table_schema_version generate_schema_version(utils::UUID table_id) {
|
||||
table_schema_version generate_schema_version(utils::UUID table_id, uint16_t offset) {
|
||||
md5_hasher h;
|
||||
feed_hash(h, table_id);
|
||||
feed_hash(h, version_sequence_number);
|
||||
feed_hash(h, version_sequence_number + offset);
|
||||
return utils::UUID_gen::get_name_UUID(h.finalize());
|
||||
}
|
||||
|
||||
@@ -1748,7 +1748,7 @@ static void maybe_add_virtual_reader(schema_ptr s, database& db) {
|
||||
}
|
||||
|
||||
static bool maybe_write_in_user_memory(schema_ptr s, database& db) {
|
||||
return (s.get() == batchlog().get())
|
||||
return (s.get() == batchlog().get()) || (s.get() == paxos().get())
|
||||
|| s == v3::scylla_views_builds_in_progress();
|
||||
}
|
||||
|
||||
|
||||
@@ -152,7 +152,7 @@ schema_ptr aggregates();
|
||||
|
||||
}
|
||||
|
||||
table_schema_version generate_schema_version(utils::UUID table_id);
|
||||
table_schema_version generate_schema_version(utils::UUID table_id, uint16_t offset = 0);
|
||||
|
||||
// Only for testing.
|
||||
void minimal_setup(distributed<database>& db, distributed<cql3::query_processor>& qp);
|
||||
|
||||
@@ -307,7 +307,7 @@ deletable_row& view_updates::get_view_row(const partition_key& base_key, const c
|
||||
if (!cdef.is_computed()) {
|
||||
//FIXME(sarna): this legacy code is here for backward compatibility and should be removed
|
||||
// once "computed_columns feature" is supported by every node
|
||||
if (!service::get_local_storage_service().db().local().find_column_family(_base->id()).get_index_manager().is_index(*_base)) {
|
||||
if (!service::get_local_storage_service().db().local().find_column_family(_base->id()).get_index_manager().is_index(*_view)) {
|
||||
throw std::logic_error(format("Column {} doesn't exist in base and this view is not backing a secondary index", cdef.name_as_text()));
|
||||
}
|
||||
computed_value = token_column_computation().compute_value(*_base, base_key, update);
|
||||
@@ -879,7 +879,11 @@ future<stop_iteration> view_update_builder::on_results() {
|
||||
if (_update && !_update->is_end_of_partition()) {
|
||||
if (_update->is_clustering_row()) {
|
||||
apply_tracked_tombstones(_update_tombstone_tracker, _update->as_mutable_clustering_row());
|
||||
generate_update(std::move(*_update).as_clustering_row(), { });
|
||||
auto existing_tombstone = _existing_tombstone_tracker.current_tombstone();
|
||||
auto existing = existing_tombstone
|
||||
? std::optional<clustering_row>(std::in_place, _update->as_clustering_row().key(), row_tombstone(std::move(existing_tombstone)), row_marker(), ::row())
|
||||
: std::nullopt;
|
||||
generate_update(std::move(*_update).as_clustering_row(), std::move(existing));
|
||||
}
|
||||
return advance_updates();
|
||||
}
|
||||
|
||||
2
dist/debian/build_deb.sh
vendored
2
dist/debian/build_deb.sh
vendored
@@ -125,7 +125,7 @@ if [ -z "$TARGET" ]; then
|
||||
fi
|
||||
RELOC_PKG_FULLPATH=$(readlink -f $RELOC_PKG)
|
||||
RELOC_PKG_BASENAME=$(basename $RELOC_PKG)
|
||||
SCYLLA_VERSION=$(cat SCYLLA-VERSION-FILE)
|
||||
SCYLLA_VERSION=$(cat SCYLLA-VERSION-FILE | sed 's/\.rc/~rc/')
|
||||
SCYLLA_RELEASE=$(cat SCYLLA-RELEASE-FILE)
|
||||
|
||||
ln -fv $RELOC_PKG_FULLPATH ../$PRODUCT-server_$SCYLLA_VERSION-$SCYLLA_RELEASE.orig.tar.gz
|
||||
|
||||
1
dist/debian/debian/scylla-server.install
vendored
1
dist/debian/debian/scylla-server.install
vendored
@@ -4,7 +4,6 @@ etc/security/limits.d/scylla.conf
|
||||
etc/scylla.d/*.conf
|
||||
opt/scylladb/share/doc/scylla/*
|
||||
opt/scylladb/share/doc/scylla/licenses/
|
||||
usr/lib/systemd/system/*.service
|
||||
usr/lib/systemd/system/*.timer
|
||||
usr/lib/systemd/system/*.slice
|
||||
usr/bin/scylla
|
||||
|
||||
6
dist/debian/debian/scylla-server.postrm
vendored
6
dist/debian/debian/scylla-server.postrm
vendored
@@ -6,8 +6,12 @@ case "$1" in
|
||||
purge|remove)
|
||||
rm -rf /etc/systemd/system/scylla-housekeeping-daily.service.d/
|
||||
rm -rf /etc/systemd/system/scylla-housekeeping-restart.service.d/
|
||||
rm -rf /etc/systemd/system/scylla-server.service.d/
|
||||
rm -rf /etc/systemd/system/scylla-helper.slice.d/
|
||||
# We need to keep dependencies.conf and sysconfdir.conf on 'remove',
|
||||
# otherwise it will be missing after rollback.
|
||||
if [ "$1" = "purge" ]; then
|
||||
rm -rf /etc/systemd/system/scylla-server.service.d/
|
||||
fi
|
||||
;;
|
||||
esac
|
||||
|
||||
|
||||
2
dist/docker/redhat/scylla-jmx-service.sh
vendored
2
dist/docker/redhat/scylla-jmx-service.sh
vendored
@@ -2,4 +2,4 @@
|
||||
|
||||
source /etc/sysconfig/scylla-jmx
|
||||
|
||||
exec /opt/scylladb/scripts/jmx/scylla-jmx -l /opt/scylladb/scripts/jmx
|
||||
exec /opt/scylladb/jmx/scylla-jmx -l /opt/scylladb/jmx
|
||||
|
||||
4
dist/redhat/scylla.spec.mustache
vendored
4
dist/redhat/scylla.spec.mustache
vendored
@@ -17,6 +17,10 @@ Obsoletes: scylla-server < 1.1
|
||||
|
||||
%undefine _find_debuginfo_dwz_opts
|
||||
|
||||
# Prevent find-debuginfo.sh from tempering with scylla's build-id (#5881)
|
||||
%undefine _unique_build_ids
|
||||
%global _no_recompute_build_ids 1
|
||||
|
||||
%description
|
||||
Scylla is a highly scalable, eventually consistent, distributed,
|
||||
partitioned row DB.
|
||||
|
||||
@@ -76,6 +76,9 @@ Scylla with issue #4139 fixed)
|
||||
bit 4: CorrectEmptyCounters (if set, indicates the sstable was generated by
|
||||
Scylla with issue #4363 fixed)
|
||||
|
||||
bit 5: CorrectUDTsInCollections (if set, indicates that the sstable was generated
|
||||
by Scylla with issue #6130 fixed)
|
||||
|
||||
## extension_attributes subcomponent
|
||||
|
||||
extension_attributes = extension_attribute_count extension_attribute*
|
||||
|
||||
@@ -98,6 +98,13 @@ public:
|
||||
sstring get_message() const { return what(); }
|
||||
};
|
||||
|
||||
class server_exception : public cassandra_exception {
|
||||
public:
|
||||
server_exception(sstring msg) noexcept
|
||||
: exceptions::cassandra_exception{exceptions::exception_code::SERVER_ERROR, std::move(msg)}
|
||||
{ }
|
||||
};
|
||||
|
||||
class protocol_exception : public cassandra_exception {
|
||||
public:
|
||||
protocol_exception(sstring msg) noexcept
|
||||
|
||||
@@ -1622,11 +1622,15 @@ future<> gossiper::start_gossiping(int generation_nbr, std::map<application_stat
|
||||
// message on all cpus and forard them to cpu0 to process.
|
||||
return get_gossiper().invoke_on_all([do_bind] (gossiper& g) {
|
||||
g.init_messaging_service_handler(do_bind);
|
||||
}).then([this, generation_nbr, preload_local_states] {
|
||||
}).then([this, generation_nbr, preload_local_states] () mutable {
|
||||
build_seeds_list();
|
||||
/* initialize the heartbeat state for this localEndpoint */
|
||||
maybe_initialize_local_state(generation_nbr);
|
||||
if (_cfg.force_gossip_generation() > 0) {
|
||||
generation_nbr = _cfg.force_gossip_generation();
|
||||
logger.warn("Use the generation number provided by user: generation = {}", generation_nbr);
|
||||
}
|
||||
endpoint_state& local_state = endpoint_state_map[get_broadcast_address()];
|
||||
local_state.set_heart_beat_state_and_update_timestamp(heart_beat_state(generation_nbr));
|
||||
local_state.mark_alive();
|
||||
for (auto& entry : preload_local_states) {
|
||||
local_state.add_application_state(entry.first, entry.second);
|
||||
}
|
||||
@@ -1831,7 +1835,8 @@ future<> gossiper::do_stop_gossiping() {
|
||||
if (my_ep_state && !is_silent_shutdown_state(*my_ep_state)) {
|
||||
logger.info("Announcing shutdown");
|
||||
add_local_application_state(application_state::STATUS, _value_factory.shutdown(true)).get();
|
||||
for (inet_address addr : _live_endpoints) {
|
||||
auto live_endpoints = _live_endpoints;
|
||||
for (inet_address addr : live_endpoints) {
|
||||
msg_addr id = get_msg_addr(addr);
|
||||
logger.trace("Sending a GossipShutdown to {}", id);
|
||||
ms().send_gossip_shutdown(id, get_broadcast_address()).then_wrapped([id] (auto&&f) {
|
||||
|
||||
@@ -53,13 +53,13 @@ std::vector<inet_address> simple_strategy::calculate_natural_endpoints(const tok
|
||||
endpoints.reserve(replicas);
|
||||
|
||||
for (auto& token : tm.ring_range(t)) {
|
||||
if (endpoints.size() == replicas) {
|
||||
break;
|
||||
}
|
||||
auto ep = tm.get_endpoint(token);
|
||||
assert(ep);
|
||||
|
||||
endpoints.push_back(*ep);
|
||||
if (endpoints.size() == replicas) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return std::move(endpoints.get_vector());
|
||||
|
||||
10
main.cc
10
main.cc
@@ -54,6 +54,7 @@
|
||||
#include <seastar/core/file.hh>
|
||||
#include <sys/time.h>
|
||||
#include <sys/resource.h>
|
||||
#include <sys/prctl.h>
|
||||
#include "disk-error-handler.hh"
|
||||
#include "tracing/tracing.hh"
|
||||
#include "tracing/tracing_backend_registry.hh"
|
||||
@@ -464,6 +465,15 @@ inline auto defer_with_log_on_error(Func&& func) {
|
||||
}
|
||||
|
||||
int main(int ac, char** av) {
|
||||
// Allow core dumps. The would be disabled by default if
|
||||
// CAP_SYS_NICE was added to the binary, as is suggested by the
|
||||
// epoll backend.
|
||||
int r = prctl(PR_SET_DUMPABLE, 1, 0, 0, 0);
|
||||
if (r) {
|
||||
std::cerr << "Could not make scylla dumpable\n";
|
||||
exit(1);
|
||||
}
|
||||
|
||||
int return_value = 0;
|
||||
try {
|
||||
// early check to avoid triggering
|
||||
|
||||
@@ -39,6 +39,9 @@
|
||||
#include <seastar/core/execution_stage.hh>
|
||||
#include "types/map.hh"
|
||||
#include "compaction_garbage_collector.hh"
|
||||
#include "utils/exceptions.hh"
|
||||
|
||||
logging::logger mplog("mutation_partition");
|
||||
|
||||
template<bool reversed>
|
||||
struct reversal_traits;
|
||||
@@ -1236,7 +1239,9 @@ row::apply_monotonically(const column_definition& column, atomic_cell_or_collect
|
||||
void
|
||||
row::append_cell(column_id id, atomic_cell_or_collection value) {
|
||||
if (_type == storage_type::vector && id < max_vector_size) {
|
||||
assert(_storage.vector.v.size() <= id);
|
||||
if (_storage.vector.v.size() > id) {
|
||||
on_internal_error(mplog, format("Attempted to append cell#{} to row already having {} cells", id, _storage.vector.v.size()));
|
||||
}
|
||||
_storage.vector.v.resize(id);
|
||||
_storage.vector.v.emplace_back(cell_and_hash{std::move(value), cell_hash_opt()});
|
||||
_storage.vector.present.set(id);
|
||||
|
||||
@@ -177,6 +177,13 @@ future<> multishard_writer::distribute_mutation_fragments() {
|
||||
return handle_end_of_stream();
|
||||
}
|
||||
});
|
||||
}).handle_exception([this] (std::exception_ptr ep) {
|
||||
for (auto& q : _queue_reader_handles) {
|
||||
if (q) {
|
||||
q->abort(ep);
|
||||
}
|
||||
}
|
||||
return make_exception_future<>(std::move(ep));
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -444,10 +444,14 @@ class repair_writer {
|
||||
uint64_t _estimated_partitions;
|
||||
size_t _nr_peer_nodes;
|
||||
// Needs more than one for repair master
|
||||
std::vector<std::optional<future<uint64_t>>> _writer_done;
|
||||
std::vector<std::optional<future<>>> _writer_done;
|
||||
std::vector<std::optional<seastar::queue<mutation_fragment_opt>>> _mq;
|
||||
// Current partition written to disk
|
||||
std::vector<lw_shared_ptr<const decorated_key_with_hash>> _current_dk_written_to_sstable;
|
||||
// Is current partition still open. A partition is opened when a
|
||||
// partition_start is written and is closed when a partition_end is
|
||||
// written.
|
||||
std::vector<bool> _partition_opened;
|
||||
public:
|
||||
repair_writer(
|
||||
schema_ptr schema,
|
||||
@@ -462,10 +466,13 @@ public:
|
||||
future<> write_start_and_mf(lw_shared_ptr<const decorated_key_with_hash> dk, mutation_fragment mf, unsigned node_idx) {
|
||||
_current_dk_written_to_sstable[node_idx] = dk;
|
||||
if (mf.is_partition_start()) {
|
||||
return _mq[node_idx]->push_eventually(mutation_fragment_opt(std::move(mf)));
|
||||
return _mq[node_idx]->push_eventually(mutation_fragment_opt(std::move(mf))).then([this, node_idx] {
|
||||
_partition_opened[node_idx] = true;
|
||||
});
|
||||
} else {
|
||||
auto start = mutation_fragment(partition_start(dk->dk, tombstone()));
|
||||
return _mq[node_idx]->push_eventually(mutation_fragment_opt(std::move(start))).then([this, node_idx, mf = std::move(mf)] () mutable {
|
||||
_partition_opened[node_idx] = true;
|
||||
return _mq[node_idx]->push_eventually(mutation_fragment_opt(std::move(mf)));
|
||||
});
|
||||
}
|
||||
@@ -475,6 +482,7 @@ public:
|
||||
_writer_done.resize(_nr_peer_nodes);
|
||||
_mq.resize(_nr_peer_nodes);
|
||||
_current_dk_written_to_sstable.resize(_nr_peer_nodes);
|
||||
_partition_opened.resize(_nr_peer_nodes, false);
|
||||
}
|
||||
|
||||
void create_writer(unsigned node_idx) {
|
||||
@@ -516,7 +524,24 @@ public:
|
||||
return consumer(std::move(reader));
|
||||
});
|
||||
},
|
||||
t.stream_in_progress());
|
||||
t.stream_in_progress()).then([this, node_idx] (uint64_t partitions) {
|
||||
rlogger.debug("repair_writer: keyspace={}, table={}, managed to write partitions={} to sstable",
|
||||
_schema->ks_name(), _schema->cf_name(), partitions);
|
||||
}).handle_exception([this, node_idx] (std::exception_ptr ep) {
|
||||
rlogger.warn("repair_writer: keyspace={}, table={}, multishard_writer failed: {}",
|
||||
_schema->ks_name(), _schema->cf_name(), ep);
|
||||
_mq[node_idx]->abort(ep);
|
||||
return make_exception_future<>(std::move(ep));
|
||||
});
|
||||
}
|
||||
|
||||
future<> write_partition_end(unsigned node_idx) {
|
||||
if (_partition_opened[node_idx]) {
|
||||
return _mq[node_idx]->push_eventually(mutation_fragment(partition_end())).then([this, node_idx] {
|
||||
_partition_opened[node_idx] = false;
|
||||
});
|
||||
}
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
future<> do_write(unsigned node_idx, lw_shared_ptr<const decorated_key_with_hash> dk, mutation_fragment mf) {
|
||||
@@ -524,7 +549,7 @@ public:
|
||||
if (_current_dk_written_to_sstable[node_idx]->dk.equal(*_schema, dk->dk)) {
|
||||
return _mq[node_idx]->push_eventually(mutation_fragment_opt(std::move(mf)));
|
||||
} else {
|
||||
return _mq[node_idx]->push_eventually(mutation_fragment(partition_end())).then([this,
|
||||
return write_partition_end(node_idx).then([this,
|
||||
node_idx, dk = std::move(dk), mf = std::move(mf)] () mutable {
|
||||
return write_start_and_mf(std::move(dk), std::move(mf), node_idx);
|
||||
});
|
||||
@@ -534,21 +559,33 @@ public:
|
||||
}
|
||||
}
|
||||
|
||||
future<> write_end_of_stream(unsigned node_idx) {
|
||||
if (_mq[node_idx]) {
|
||||
// Partition_end is never sent on wire, so we have to write one ourselves.
|
||||
return write_partition_end(node_idx).then([this, node_idx] () mutable {
|
||||
// Empty mutation_fragment_opt means no more data, so the writer can seal the sstables.
|
||||
return _mq[node_idx]->push_eventually(mutation_fragment_opt());
|
||||
});
|
||||
} else {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
}
|
||||
|
||||
future<> do_wait_for_writer_done(unsigned node_idx) {
|
||||
if (_writer_done[node_idx]) {
|
||||
return std::move(*(_writer_done[node_idx]));
|
||||
} else {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
}
|
||||
|
||||
future<> wait_for_writer_done() {
|
||||
return parallel_for_each(boost::irange(unsigned(0), unsigned(_nr_peer_nodes)), [this] (unsigned node_idx) {
|
||||
if (_writer_done[node_idx] && _mq[node_idx]) {
|
||||
// Partition_end is never sent on wire, so we have to write one ourselves.
|
||||
return _mq[node_idx]->push_eventually(mutation_fragment(partition_end())).then([this, node_idx] () mutable {
|
||||
// Empty mutation_fragment_opt means no more data, so the writer can seal the sstables.
|
||||
return _mq[node_idx]->push_eventually(mutation_fragment_opt()).then([this, node_idx] () mutable {
|
||||
return (*_writer_done[node_idx]).then([] (uint64_t partitions) {
|
||||
rlogger.debug("Managed to write partitions={} to sstable", partitions);
|
||||
return make_ready_future<>();
|
||||
});
|
||||
});
|
||||
});
|
||||
}
|
||||
return make_ready_future<>();
|
||||
return when_all_succeed(write_end_of_stream(node_idx), do_wait_for_writer_done(node_idx));
|
||||
}).handle_exception([this] (std::exception_ptr ep) {
|
||||
rlogger.warn("repair_writer: keyspace={}, table={}, wait_for_writer_done failed: {}",
|
||||
_schema->ks_name(), _schema->cf_name(), ep);
|
||||
return make_exception_future<>(std::move(ep));
|
||||
});
|
||||
}
|
||||
};
|
||||
@@ -2174,7 +2211,7 @@ class row_level_repair {
|
||||
|
||||
// If the total size of the `_row_buf` on either of the nodes is zero,
|
||||
// we set this flag, which is an indication that rows are not synced.
|
||||
bool _zero_rows;
|
||||
bool _zero_rows = false;
|
||||
|
||||
// Sum of estimated_partitions on all peers
|
||||
uint64_t _estimated_partitions = 0;
|
||||
|
||||
@@ -288,10 +288,10 @@ schema::schema(const raw_schema& raw, std::optional<raw_view_info> raw_view_info
|
||||
+ column_offset(column_kind::regular_column),
|
||||
_raw._columns.end(), column_definition::name_comparator(regular_column_name_type()));
|
||||
|
||||
std::sort(_raw._columns.begin(),
|
||||
std::stable_sort(_raw._columns.begin(),
|
||||
_raw._columns.begin() + column_offset(column_kind::clustering_key),
|
||||
[] (auto x, auto y) { return x.id < y.id; });
|
||||
std::sort(_raw._columns.begin() + column_offset(column_kind::clustering_key),
|
||||
std::stable_sort(_raw._columns.begin() + column_offset(column_kind::clustering_key),
|
||||
_raw._columns.begin() + column_offset(column_kind::static_column),
|
||||
[] (auto x, auto y) { return x.id < y.id; });
|
||||
|
||||
|
||||
@@ -109,7 +109,10 @@ std::optional<std::map<sstring, sstring>> schema_mutations::cdc_options() const
|
||||
if (_scylla_tables) {
|
||||
auto rs = query::result_set(*_scylla_tables);
|
||||
if (!rs.empty()) {
|
||||
return db::schema_tables::get_map<sstring, sstring>(rs.row(0), "cdc");
|
||||
auto map = db::schema_tables::get_map<sstring, sstring>(rs.row(0), "cdc");
|
||||
if (map && !map->empty()) {
|
||||
return map;
|
||||
}
|
||||
}
|
||||
}
|
||||
return { };
|
||||
|
||||
@@ -58,7 +58,8 @@ EOS
|
||||
# For systems with not a lot of memory, override default reservations for the slices
|
||||
# seastar has a minimum reservation of 1.5GB that kicks in, and 21GB * 0.07 = 1.5GB.
|
||||
# So for anything smaller than that we will not use percentages in the helper slice
|
||||
MEMTOTAL_BYTES=$(cat /proc/meminfo | grep MemTotal | awk '{print $2 * 1024}')
|
||||
MEMTOTAL=$(cat /proc/meminfo |grep -e "^MemTotal:"|sed -s 's/^MemTotal:\s*\([0-9]*\) kB$/\1/')
|
||||
MEMTOTAL_BYTES=$(($MEMTOTAL * 1024))
|
||||
if [ $MEMTOTAL_BYTES -lt 23008753371 ]; then
|
||||
mkdir -p /etc/systemd/system/scylla-helper.slice.d/
|
||||
cat << EOS > /etc/systemd/system/scylla-helper.slice.d/memory.conf
|
||||
|
||||
2
seastar
2
seastar
Submodule seastar updated: 8e236efda9...c8668e98bd
@@ -93,6 +93,7 @@ void migration_manager::init_messaging_service()
|
||||
|
||||
_feature_listeners.push_back(ss.cluster_supports_view_virtual_columns().when_enabled(update_schema));
|
||||
_feature_listeners.push_back(ss.cluster_supports_digest_insensitive_to_expiry().when_enabled(update_schema));
|
||||
_feature_listeners.push_back(ss.cluster_supports_cdc().when_enabled(update_schema));
|
||||
|
||||
auto& ms = netw::get_local_messaging_service();
|
||||
ms.register_definitions_update([this] (const rpc::client_info& cinfo, std::vector<frozen_mutation> fm, rpc::optional<std::vector<canonical_mutation>> cm) {
|
||||
@@ -311,7 +312,8 @@ future<> migration_manager::merge_schema_from(netw::messaging_service::msg_addr
|
||||
try {
|
||||
for (const auto& cm : canonical_mutations) {
|
||||
auto& tbl = db.find_column_family(cm.column_family_id());
|
||||
mutations.emplace_back(cm.to_mutation(tbl.schema()));
|
||||
mutations.emplace_back(cm.to_mutation(
|
||||
tbl.schema()));
|
||||
}
|
||||
} catch (no_such_column_family& e) {
|
||||
mlogger.error("Error while applying schema mutations from {}: {}", src, e);
|
||||
@@ -902,8 +904,9 @@ future<> migration_manager::announce(std::vector<mutation> mutations, bool annou
|
||||
future<> migration_manager::push_schema_mutation(const gms::inet_address& endpoint, const std::vector<mutation>& schema)
|
||||
{
|
||||
netw::messaging_service::msg_addr id{endpoint, 0};
|
||||
auto fm = std::vector<frozen_mutation>(schema.begin(), schema.end());
|
||||
auto cm = std::vector<canonical_mutation>(schema.begin(), schema.end());
|
||||
auto adjusted_schema = db::schema_tables::adjust_schema_for_schema_features(schema, get_local_storage_service().cluster_schema_features());
|
||||
auto fm = std::vector<frozen_mutation>(adjusted_schema.begin(), adjusted_schema.end());
|
||||
auto cm = std::vector<canonical_mutation>(adjusted_schema.begin(), adjusted_schema.end());
|
||||
return netw::get_local_messaging_service().send_definitions_update(id, std::move(fm), std::move(cm));
|
||||
}
|
||||
|
||||
|
||||
@@ -38,7 +38,12 @@ private:
|
||||
public:
|
||||
query_state(client_state& client_state, service_permit permit)
|
||||
: _client_state(client_state)
|
||||
, _trace_state_ptr(_client_state.get_trace_state())
|
||||
, _trace_state_ptr(tracing::trace_state_ptr())
|
||||
, _permit(std::move(permit))
|
||||
{ }
|
||||
query_state(client_state& client_state, tracing::trace_state_ptr trace_state, service_permit permit)
|
||||
: _client_state(client_state)
|
||||
, _trace_state_ptr(std::move(trace_state))
|
||||
, _permit(std::move(permit))
|
||||
{ }
|
||||
|
||||
|
||||
@@ -2183,6 +2183,14 @@ future<> storage_proxy::send_to_endpoint(
|
||||
allow_hints);
|
||||
}
|
||||
|
||||
future<> storage_proxy::mutate_hint_from_scratch(frozen_mutation_and_schema fm_a_s) {
|
||||
// FIXME: using 1h as infinite timeout. If a node is down, we should get an
|
||||
// unavailable exception.
|
||||
const auto timeout = db::timeout_clock::now() + 1h;
|
||||
std::array<mutation, 1> ms{fm_a_s.fm.unfreeze(fm_a_s.s)};
|
||||
return mutate_internal(std::move(ms), db::consistency_level::ALL, false, nullptr, empty_service_permit(), timeout);
|
||||
}
|
||||
|
||||
/**
|
||||
* Send the mutations to the right targets, write it locally if it corresponds or writes a hint when the node
|
||||
* is not available.
|
||||
@@ -3935,7 +3943,7 @@ storage_proxy::do_query_with_paxos(schema_ptr s,
|
||||
return make_ready_future<storage_proxy::coordinator_query_result>(f.get0());
|
||||
} catch (request_timeout_exception& ex) {
|
||||
_stats.cas_read_timeouts.mark();
|
||||
return make_exception_future<storage_proxy::coordinator_query_result>(std::move(ex));
|
||||
return make_exception_future<storage_proxy::coordinator_query_result>(std::current_exception());
|
||||
} catch (exceptions::unavailable_exception& ex) {
|
||||
_stats.cas_read_unavailables.mark();
|
||||
return make_exception_future<storage_proxy::coordinator_query_result>(std::move(ex));
|
||||
@@ -4062,7 +4070,7 @@ future<bool> storage_proxy::cas(schema_ptr schema, shared_ptr<cas_request> reque
|
||||
return make_ready_future<bool>(f.get0());
|
||||
} catch (request_timeout_exception& ex) {
|
||||
_stats.cas_write_timeouts.mark();
|
||||
return make_exception_future<bool>(std::move(ex));
|
||||
return make_exception_future<bool>(std::current_exception());
|
||||
} catch (exceptions::unavailable_exception& ex) {
|
||||
_stats.cas_write_unavailables.mark();
|
||||
return make_exception_future<bool>(std::move(ex));
|
||||
|
||||
@@ -459,6 +459,8 @@ public:
|
||||
*/
|
||||
future<> mutate_atomically(std::vector<mutation> mutations, db::consistency_level cl, clock_type::time_point timeout, tracing::trace_state_ptr tr_state, service_permit permit);
|
||||
|
||||
future<> mutate_hint_from_scratch(frozen_mutation_and_schema fm_a_s);
|
||||
|
||||
// Send a mutation to one specific remote target.
|
||||
// Inspired by Cassandra's StorageProxy.sendToHintedEndpoints but without
|
||||
// hinted handoff support, and just one target. See also
|
||||
|
||||
@@ -1440,7 +1440,8 @@ future<> storage_service::drain_on_shutdown() {
|
||||
ss._sys_dist_ks.invoke_on_all(&db::system_distributed_keyspace::stop).get();
|
||||
slogger.info("Drain on shutdown: system distributed keyspace stopped");
|
||||
|
||||
get_storage_proxy().invoke_on_all([&ss] (storage_proxy& local_proxy) mutable {
|
||||
get_storage_proxy().invoke_on_all([] (storage_proxy& local_proxy) mutable {
|
||||
auto& ss = service::get_local_storage_service();
|
||||
ss.unregister_subscriber(&local_proxy);
|
||||
return local_proxy.drain_on_shutdown();
|
||||
}).get();
|
||||
@@ -3532,6 +3533,7 @@ db::schema_features storage_service::cluster_schema_features() const {
|
||||
f.set_if<db::schema_feature::VIEW_VIRTUAL_COLUMNS>(bool(_view_virtual_columns));
|
||||
f.set_if<db::schema_feature::DIGEST_INSENSITIVE_TO_EXPIRY>(bool(_digest_insensitive_to_expiry));
|
||||
f.set_if<db::schema_feature::COMPUTED_COLUMNS>(bool(_computed_columns));
|
||||
f.set_if<db::schema_feature::CDC_OPTIONS>(bool(_cdc_feature));
|
||||
return f;
|
||||
}
|
||||
|
||||
|
||||
@@ -2341,8 +2341,8 @@ public:
|
||||
return bool(_mc_sstable_feature);
|
||||
}
|
||||
|
||||
bool cluster_supports_cdc() const {
|
||||
return bool(_cdc_feature);
|
||||
const gms::feature& cluster_supports_cdc() const {
|
||||
return _cdc_feature;
|
||||
}
|
||||
|
||||
bool cluster_supports_row_level_repair() const {
|
||||
|
||||
@@ -72,47 +72,8 @@ private:
|
||||
static std::vector<column_info> build(
|
||||
const schema& s,
|
||||
const utils::chunked_vector<serialization_header::column_desc>& src,
|
||||
bool is_static) {
|
||||
std::vector<column_info> cols;
|
||||
if (s.is_dense()) {
|
||||
const column_definition& col = is_static ? *s.static_begin() : *s.regular_begin();
|
||||
cols.push_back(column_info{
|
||||
&col.name(),
|
||||
col.type,
|
||||
col.id,
|
||||
col.type->value_length_if_fixed(),
|
||||
col.is_multi_cell(),
|
||||
col.is_counter(),
|
||||
false
|
||||
});
|
||||
} else {
|
||||
cols.reserve(src.size());
|
||||
for (auto&& desc : src) {
|
||||
const bytes& type_name = desc.type_name.value;
|
||||
data_type type = db::marshal::type_parser::parse(to_sstring_view(type_name));
|
||||
const column_definition* def = s.get_column_definition(desc.name.value);
|
||||
std::optional<column_id> id;
|
||||
bool schema_mismatch = false;
|
||||
if (def) {
|
||||
id = def->id;
|
||||
schema_mismatch = def->is_multi_cell() != type->is_multi_cell() ||
|
||||
def->is_counter() != type->is_counter() ||
|
||||
!def->type->is_value_compatible_with(*type);
|
||||
}
|
||||
cols.push_back(column_info{
|
||||
&desc.name.value,
|
||||
type,
|
||||
id,
|
||||
type->value_length_if_fixed(),
|
||||
type->is_multi_cell(),
|
||||
type->is_counter(),
|
||||
schema_mismatch
|
||||
});
|
||||
}
|
||||
boost::range::stable_partition(cols, [](const column_info& column) { return !column.is_collection; });
|
||||
}
|
||||
return cols;
|
||||
}
|
||||
const sstable_enabled_features& features,
|
||||
bool is_static);
|
||||
|
||||
utils::UUID schema_uuid;
|
||||
std::vector<column_info> regular_schema_columns_from_sstable;
|
||||
@@ -125,10 +86,10 @@ private:
|
||||
state(state&&) = default;
|
||||
state& operator=(state&&) = default;
|
||||
|
||||
state(const schema& s, const serialization_header& header)
|
||||
state(const schema& s, const serialization_header& header, const sstable_enabled_features& features)
|
||||
: schema_uuid(s.version())
|
||||
, regular_schema_columns_from_sstable(build(s, header.regular_columns.elements, false))
|
||||
, static_schema_columns_from_sstable(build(s, header.static_columns.elements, true))
|
||||
, regular_schema_columns_from_sstable(build(s, header.regular_columns.elements, features, false))
|
||||
, static_schema_columns_from_sstable(build(s, header.static_columns.elements, features, true))
|
||||
, clustering_column_value_fix_lengths (get_clustering_values_fixed_lengths(header))
|
||||
{}
|
||||
};
|
||||
@@ -136,9 +97,10 @@ private:
|
||||
lw_shared_ptr<const state> _state = make_lw_shared<const state>();
|
||||
|
||||
public:
|
||||
column_translation get_for_schema(const schema& s, const serialization_header& header) {
|
||||
column_translation get_for_schema(
|
||||
const schema& s, const serialization_header& header, const sstable_enabled_features& features) {
|
||||
if (s.version() != _state->schema_uuid) {
|
||||
_state = make_lw_shared(state(s, header));
|
||||
_state = make_lw_shared(state(s, header, features));
|
||||
}
|
||||
return *this;
|
||||
}
|
||||
|
||||
@@ -38,6 +38,8 @@
|
||||
*/
|
||||
|
||||
#include "mp_row_consumer.hh"
|
||||
#include "column_translation.hh"
|
||||
#include "concrete_types.hh"
|
||||
|
||||
namespace sstables {
|
||||
|
||||
@@ -79,4 +81,86 @@ atomic_cell make_counter_cell(api::timestamp_type timestamp, bytes_view value) {
|
||||
return ccb.build(timestamp);
|
||||
}
|
||||
|
||||
// See #6130.
|
||||
static data_type freeze_types_in_collections(data_type t) {
|
||||
return ::visit(*t, make_visitor(
|
||||
[] (const map_type_impl& typ) -> data_type {
|
||||
return map_type_impl::get_instance(
|
||||
freeze_types_in_collections(typ.get_keys_type()->freeze()),
|
||||
freeze_types_in_collections(typ.get_values_type()->freeze()),
|
||||
typ.is_multi_cell());
|
||||
},
|
||||
[] (const set_type_impl& typ) -> data_type {
|
||||
return set_type_impl::get_instance(
|
||||
freeze_types_in_collections(typ.get_elements_type()->freeze()),
|
||||
typ.is_multi_cell());
|
||||
},
|
||||
[] (const list_type_impl& typ) -> data_type {
|
||||
return list_type_impl::get_instance(
|
||||
freeze_types_in_collections(typ.get_elements_type()->freeze()),
|
||||
typ.is_multi_cell());
|
||||
},
|
||||
[&] (const abstract_type& typ) -> data_type {
|
||||
return std::move(t);
|
||||
}
|
||||
));
|
||||
}
|
||||
|
||||
/* If this function returns false, the caller cannot assume that the SSTable comes from Scylla.
|
||||
* It might, if for some reason a table was created using Scylla that didn't contain any feature bit,
|
||||
* but that should never happen. */
|
||||
static bool is_certainly_scylla_sstable(const sstable_enabled_features& features) {
|
||||
return features.enabled_features;
|
||||
}
|
||||
|
||||
std::vector<column_translation::column_info> column_translation::state::build(
|
||||
const schema& s,
|
||||
const utils::chunked_vector<serialization_header::column_desc>& src,
|
||||
const sstable_enabled_features& features,
|
||||
bool is_static) {
|
||||
std::vector<column_info> cols;
|
||||
if (s.is_dense()) {
|
||||
const column_definition& col = is_static ? *s.static_begin() : *s.regular_begin();
|
||||
cols.push_back(column_info{
|
||||
&col.name(),
|
||||
col.type,
|
||||
col.id,
|
||||
col.type->value_length_if_fixed(),
|
||||
col.is_multi_cell(),
|
||||
col.is_counter(),
|
||||
false
|
||||
});
|
||||
} else {
|
||||
cols.reserve(src.size());
|
||||
for (auto&& desc : src) {
|
||||
const bytes& type_name = desc.type_name.value;
|
||||
data_type type = db::marshal::type_parser::parse(to_sstring_view(type_name));
|
||||
if (!features.is_enabled(CorrectUDTsInCollections) && is_certainly_scylla_sstable(features)) {
|
||||
// See #6130.
|
||||
type = freeze_types_in_collections(std::move(type));
|
||||
}
|
||||
const column_definition* def = s.get_column_definition(desc.name.value);
|
||||
std::optional<column_id> id;
|
||||
bool schema_mismatch = false;
|
||||
if (def) {
|
||||
id = def->id;
|
||||
schema_mismatch = def->is_multi_cell() != type->is_multi_cell() ||
|
||||
def->is_counter() != type->is_counter() ||
|
||||
!def->type->is_value_compatible_with(*type);
|
||||
}
|
||||
cols.push_back(column_info{
|
||||
&desc.name.value,
|
||||
type,
|
||||
id,
|
||||
type->value_length_if_fixed(),
|
||||
type->is_multi_cell(),
|
||||
type->is_counter(),
|
||||
schema_mismatch
|
||||
});
|
||||
}
|
||||
boost::range::stable_partition(cols, [](const column_info& column) { return !column.is_collection; });
|
||||
}
|
||||
return cols;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -1344,7 +1344,7 @@ public:
|
||||
, _consumer(consumer)
|
||||
, _sst(sst)
|
||||
, _header(sst->get_serialization_header())
|
||||
, _column_translation(sst->get_column_translation(s, _header))
|
||||
, _column_translation(sst->get_column_translation(s, _header, sst->features()))
|
||||
, _has_shadowable_tombstones(sst->has_shadowable_tombstones())
|
||||
{
|
||||
setup_columns(_regular_row, _column_translation.regular_columns());
|
||||
|
||||
@@ -780,8 +780,9 @@ public:
|
||||
const serialization_header& get_serialization_header() const {
|
||||
return get_mutable_serialization_header(*_components);
|
||||
}
|
||||
column_translation get_column_translation(const schema& s, const serialization_header& h) {
|
||||
return _column_translation.get_for_schema(s, h);
|
||||
column_translation get_column_translation(
|
||||
const schema& s, const serialization_header& h, const sstable_enabled_features& f) {
|
||||
return _column_translation.get_for_schema(s, h, f);
|
||||
}
|
||||
const std::vector<unsigned>& get_shards_for_this_sstable() const {
|
||||
return _shards;
|
||||
|
||||
@@ -459,7 +459,8 @@ enum sstable_feature : uint8_t {
|
||||
ShadowableTombstones = 2, // See #3885
|
||||
CorrectStaticCompact = 3, // See #4139
|
||||
CorrectEmptyCounters = 4, // See #4363
|
||||
End = 5,
|
||||
CorrectUDTsInCollections = 5, // See #6130
|
||||
End = 6,
|
||||
};
|
||||
|
||||
// Scylla-specific features enabled for a particular sstable.
|
||||
|
||||
@@ -77,3 +77,45 @@ BOOST_AUTO_TEST_CASE(test_make_random_uuid) {
|
||||
std::sort(uuids.begin(), uuids.end());
|
||||
BOOST_CHECK(std::unique(uuids.begin(), uuids.end()) == uuids.end());
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_CASE(test_get_time_uuid) {
|
||||
using namespace std::chrono;
|
||||
|
||||
auto uuid = utils::UUID_gen::get_time_UUID();
|
||||
BOOST_CHECK(uuid.is_timestamp());
|
||||
|
||||
auto tp = system_clock::now();
|
||||
uuid = utils::UUID_gen::get_time_UUID(tp);
|
||||
BOOST_CHECK(uuid.is_timestamp());
|
||||
|
||||
auto millis = duration_cast<milliseconds>(tp.time_since_epoch()).count();
|
||||
uuid = utils::UUID_gen::get_time_UUID(millis);
|
||||
BOOST_CHECK(uuid.is_timestamp());
|
||||
|
||||
auto unix_timestamp = utils::UUID_gen::unix_timestamp(uuid);
|
||||
BOOST_CHECK(unix_timestamp == millis);
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_CASE(test_min_time_uuid) {
|
||||
using namespace std::chrono;
|
||||
|
||||
auto tp = system_clock::now();
|
||||
auto millis = duration_cast<milliseconds>(tp.time_since_epoch()).count();
|
||||
auto uuid = utils::UUID_gen::min_time_UUID(millis);
|
||||
BOOST_CHECK(uuid.is_timestamp());
|
||||
|
||||
auto unix_timestamp = utils::UUID_gen::unix_timestamp(uuid);
|
||||
BOOST_CHECK(unix_timestamp == millis);
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_CASE(test_max_time_uuid) {
|
||||
using namespace std::chrono;
|
||||
|
||||
auto tp = system_clock::now();
|
||||
auto millis = duration_cast<milliseconds>(tp.time_since_epoch()).count();
|
||||
auto uuid = utils::UUID_gen::max_time_UUID(millis);
|
||||
BOOST_CHECK(uuid.is_timestamp());
|
||||
|
||||
auto unix_timestamp = utils::UUID_gen::unix_timestamp(uuid);
|
||||
BOOST_CHECK(unix_timestamp == millis);
|
||||
}
|
||||
|
||||
@@ -933,7 +933,6 @@ SEASTAR_TEST_CASE(test_parse_experimental_features_cdc) {
|
||||
BOOST_CHECK_EQUAL(cfg.experimental_features(), features{ef::CDC});
|
||||
BOOST_CHECK(cfg.check_experimental(ef::CDC));
|
||||
BOOST_CHECK(!cfg.check_experimental(ef::LWT));
|
||||
BOOST_CHECK(!cfg.check_experimental(ef::UDF));
|
||||
return make_ready_future();
|
||||
}
|
||||
|
||||
@@ -943,17 +942,6 @@ SEASTAR_TEST_CASE(test_parse_experimental_features_lwt) {
|
||||
BOOST_CHECK_EQUAL(cfg.experimental_features(), features{ef::LWT});
|
||||
BOOST_CHECK(!cfg.check_experimental(ef::CDC));
|
||||
BOOST_CHECK(cfg.check_experimental(ef::LWT));
|
||||
BOOST_CHECK(!cfg.check_experimental(ef::UDF));
|
||||
return make_ready_future();
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_parse_experimental_features_udf) {
|
||||
config cfg;
|
||||
cfg.read_from_yaml("experimental_features:\n - udf\n", throw_on_error);
|
||||
BOOST_CHECK_EQUAL(cfg.experimental_features(), features{ef::UDF});
|
||||
BOOST_CHECK(!cfg.check_experimental(ef::CDC));
|
||||
BOOST_CHECK(!cfg.check_experimental(ef::LWT));
|
||||
BOOST_CHECK(cfg.check_experimental(ef::UDF));
|
||||
return make_ready_future();
|
||||
}
|
||||
|
||||
@@ -963,7 +951,6 @@ SEASTAR_TEST_CASE(test_parse_experimental_features_multiple) {
|
||||
BOOST_CHECK_EQUAL(cfg.experimental_features(), (features{ef::CDC, ef::LWT, ef::CDC}));
|
||||
BOOST_CHECK(cfg.check_experimental(ef::CDC));
|
||||
BOOST_CHECK(cfg.check_experimental(ef::LWT));
|
||||
BOOST_CHECK(!cfg.check_experimental(ef::UDF));
|
||||
return make_ready_future();
|
||||
}
|
||||
|
||||
@@ -976,7 +963,6 @@ SEASTAR_TEST_CASE(test_parse_experimental_features_invalid) {
|
||||
BOOST_REQUIRE_NE(msg.find("line 2, column 7"), msg.npos);
|
||||
BOOST_CHECK(!cfg.check_experimental(ef::CDC));
|
||||
BOOST_CHECK(!cfg.check_experimental(ef::LWT));
|
||||
BOOST_CHECK(!cfg.check_experimental(ef::UDF));
|
||||
});
|
||||
return make_ready_future();
|
||||
}
|
||||
@@ -986,7 +972,6 @@ SEASTAR_TEST_CASE(test_parse_experimental_true) {
|
||||
cfg.read_from_yaml("experimental: true", throw_on_error);
|
||||
BOOST_CHECK(cfg.check_experimental(ef::CDC));
|
||||
BOOST_CHECK(cfg.check_experimental(ef::LWT));
|
||||
BOOST_CHECK(cfg.check_experimental(ef::UDF));
|
||||
return make_ready_future();
|
||||
}
|
||||
|
||||
@@ -995,6 +980,5 @@ SEASTAR_TEST_CASE(test_parse_experimental_false) {
|
||||
cfg.read_from_yaml("experimental: false", throw_on_error);
|
||||
BOOST_CHECK(!cfg.check_experimental(ef::CDC));
|
||||
BOOST_CHECK(!cfg.check_experimental(ef::LWT));
|
||||
BOOST_CHECK(!cfg.check_experimental(ef::UDF));
|
||||
return make_ready_future();
|
||||
}
|
||||
|
||||
@@ -4263,3 +4263,272 @@ SEASTAR_TEST_CASE(test_rf_expand) {
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
// Test that tombstones with future timestamps work correctly
|
||||
// when a write with lower timestamp arrives - in such case,
|
||||
// if the base row is covered by such a tombstone, a view update
|
||||
// needs to take it into account. Refs #5793
|
||||
SEASTAR_TEST_CASE(test_views_with_future_tombstones) {
|
||||
return do_with_cql_env_thread([] (auto& e) {
|
||||
cquery_nofail(e, "CREATE TABLE t (a int, b int, c int, d int, e int, PRIMARY KEY (a,b,c));");
|
||||
cquery_nofail(e, "CREATE MATERIALIZED VIEW tv AS SELECT * FROM t"
|
||||
" WHERE a IS NOT NULL AND b IS NOT NULL AND c IS NOT NULL PRIMARY KEY (b,a,c);");
|
||||
|
||||
// Partition tombstone
|
||||
cquery_nofail(e, "delete from t using timestamp 10 where a=1;");
|
||||
auto msg = cquery_nofail(e, "select * from t;");
|
||||
assert_that(msg).is_rows().with_size(0);
|
||||
cquery_nofail(e, "insert into t (a,b,c,d,e) values (1,2,3,4,5) using timestamp 8;");
|
||||
msg = cquery_nofail(e, "select * from t;");
|
||||
assert_that(msg).is_rows().with_size(0);
|
||||
msg = cquery_nofail(e, "select * from tv;");
|
||||
assert_that(msg).is_rows().with_size(0);
|
||||
|
||||
// Range tombstone
|
||||
cquery_nofail(e, "delete from t using timestamp 16 where a=2 and b > 1 and b < 4;");
|
||||
msg = cquery_nofail(e, "select * from t;");
|
||||
assert_that(msg).is_rows().with_size(0);
|
||||
cquery_nofail(e, "insert into t (a,b,c,d,e) values (2,3,4,5,6) using timestamp 12;");
|
||||
msg = cquery_nofail(e, "select * from t;");
|
||||
assert_that(msg).is_rows().with_size(0);
|
||||
msg = cquery_nofail(e, "select * from tv;");
|
||||
assert_that(msg).is_rows().with_size(0);
|
||||
|
||||
// Row tombstone
|
||||
cquery_nofail(e, "delete from t using timestamp 24 where a=3 and b=4 and c=5;");
|
||||
msg = cquery_nofail(e, "select * from t;");
|
||||
assert_that(msg).is_rows().with_size(0);
|
||||
cquery_nofail(e, "insert into t (a,b,c,d,e) values (3,4,5,6,7) using timestamp 18;");
|
||||
msg = cquery_nofail(e, "select * from t;");
|
||||
assert_that(msg).is_rows().with_size(0);
|
||||
msg = cquery_nofail(e, "select * from tv;");
|
||||
assert_that(msg).is_rows().with_size(0);
|
||||
});
|
||||
}
|
||||
|
||||
shared_ptr<cql_transport::messages::result_message> cql_func_require_nofail(
|
||||
cql_test_env& env,
|
||||
const seastar::sstring& fct,
|
||||
const seastar::sstring& inp,
|
||||
std::unique_ptr<cql3::query_options>&& qo = nullptr,
|
||||
const std::experimental::source_location& loc = std::experimental::source_location::current()) {
|
||||
auto res = shared_ptr<cql_transport::messages::result_message>(nullptr);
|
||||
auto query = format("SELECT {}({}) FROM t;", fct, inp);
|
||||
try {
|
||||
if (qo) {
|
||||
res = env.execute_cql(query, std::move(qo)).get0();
|
||||
} else {
|
||||
res = env.execute_cql(query).get0();
|
||||
}
|
||||
BOOST_TEST_MESSAGE(format("Query '{}' succeeded as expected", query));
|
||||
} catch (...) {
|
||||
BOOST_ERROR(format("query '{}' failed unexpectedly with error: {}\n{}:{}: originally from here",
|
||||
query, std::current_exception(),
|
||||
loc.file_name(), loc.line()));
|
||||
}
|
||||
return res;
|
||||
}
|
||||
|
||||
// FIXME: should be in cql_assertions, but we don't want to call boost from cql_assertions.hh
|
||||
template <typename Exception>
|
||||
void cql_func_require_throw(
|
||||
cql_test_env& env,
|
||||
const seastar::sstring& fct,
|
||||
const seastar::sstring& inp,
|
||||
std::unique_ptr<cql3::query_options>&& qo = nullptr,
|
||||
const std::experimental::source_location& loc = std::experimental::source_location::current()) {
|
||||
auto query = format("SELECT {}({}) FROM t;", fct, inp);
|
||||
try {
|
||||
if (qo) {
|
||||
env.execute_cql(query, std::move(qo)).get();
|
||||
} else {
|
||||
env.execute_cql(query).get();
|
||||
}
|
||||
BOOST_ERROR(format("query '{}' succeeded unexpectedly\n{}:{}: originally from here", query,
|
||||
loc.file_name(), loc.line()));
|
||||
} catch (Exception& e) {
|
||||
BOOST_TEST_MESSAGE(format("Query '{}' failed as expected with error: {}", query, e));
|
||||
} catch (...) {
|
||||
BOOST_ERROR(format("query '{}' failed with unexpected error: {}\n{}:{}: originally from here",
|
||||
query, std::current_exception(),
|
||||
loc.file_name(), loc.line()));
|
||||
}
|
||||
}
|
||||
|
||||
static void create_time_uuid_fcts_schema(cql_test_env& e) {
|
||||
cquery_nofail(e, "CREATE TABLE t (id int primary key, t timestamp, l bigint, f float, u timeuuid, d date)");
|
||||
cquery_nofail(e, "INSERT INTO t (id, t, l, f, u, d) VALUES "
|
||||
"(1, 1579072460606, 1579072460606000, 1579072460606, a66525e0-3766-11ea-8080-808080808080, '2020-01-13')");
|
||||
cquery_nofail(e, "SELECT * FROM t;");
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_basic_time_uuid_fcts) {
|
||||
return do_with_cql_env_thread([] (auto& e) {
|
||||
create_time_uuid_fcts_schema(e);
|
||||
|
||||
cql_func_require_nofail(e, "currenttime", "");
|
||||
cql_func_require_nofail(e, "currentdate", "");
|
||||
cql_func_require_nofail(e, "now", "");
|
||||
cql_func_require_nofail(e, "currenttimeuuid", "");
|
||||
cql_func_require_nofail(e, "currenttimestamp", "");
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_time_uuid_fcts_input_validation) {
|
||||
return do_with_cql_env_thread([] (auto& e) {
|
||||
create_time_uuid_fcts_schema(e);
|
||||
|
||||
// test timestamp arg
|
||||
auto require_timestamp = [&e] (const sstring& fct) {
|
||||
cql_func_require_nofail(e, fct, "t");
|
||||
cql_func_require_throw<exceptions::server_exception>(e, fct, "l");
|
||||
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "f");
|
||||
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "u");
|
||||
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "d");
|
||||
|
||||
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "currenttime()");
|
||||
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "currentdate()");
|
||||
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "now()");
|
||||
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "currenttimeuuid()");
|
||||
cql_func_require_nofail(e, fct, "currenttimestamp()");
|
||||
};
|
||||
|
||||
require_timestamp("mintimeuuid");
|
||||
require_timestamp("maxtimeuuid");
|
||||
|
||||
// test timeuuid arg
|
||||
auto require_timeuuid = [&e] (const sstring& fct) {
|
||||
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "t");
|
||||
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "l");
|
||||
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "f");
|
||||
cql_func_require_nofail(e, fct, "u");
|
||||
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "d");
|
||||
|
||||
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "currenttime()");
|
||||
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "currentdate()");
|
||||
cql_func_require_nofail(e, fct, "now()");
|
||||
cql_func_require_nofail(e, fct, "currenttimeuuid()");
|
||||
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "currenttimestamp()");
|
||||
};
|
||||
|
||||
require_timeuuid("dateof");
|
||||
require_timeuuid("unixtimestampof");
|
||||
|
||||
// test timeuuid or date arg
|
||||
auto require_timeuuid_or_date = [&e] (const sstring& fct) {
|
||||
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "t");
|
||||
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "l");
|
||||
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "f");
|
||||
cql_func_require_nofail(e, fct, "u");
|
||||
cql_func_require_nofail(e, fct, "d");
|
||||
|
||||
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "currenttime()");
|
||||
cql_func_require_nofail(e, fct, "currentdate()");
|
||||
cql_func_require_nofail(e, fct, "now()");
|
||||
cql_func_require_nofail(e, fct, "currenttimeuuid()");
|
||||
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "currenttimestamp()");
|
||||
};
|
||||
|
||||
require_timeuuid_or_date("totimestamp");
|
||||
|
||||
// test timestamp or timeuuid arg
|
||||
auto require_timestamp_or_timeuuid = [&e] (const sstring& fct) {
|
||||
cql_func_require_nofail(e, fct, "t");
|
||||
cql_func_require_throw<std::exception>(e, fct, "l");
|
||||
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "f");
|
||||
cql_func_require_nofail(e, fct, "u");
|
||||
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "d");
|
||||
|
||||
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "currenttime()");
|
||||
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "currentdate()");
|
||||
cql_func_require_nofail(e, fct, "now()");
|
||||
cql_func_require_nofail(e, fct, "currenttimeuuid()");
|
||||
cql_func_require_nofail(e, fct, "currenttimestamp()");
|
||||
};
|
||||
|
||||
require_timestamp_or_timeuuid("todate");
|
||||
|
||||
// test timestamp, timeuuid, or date arg
|
||||
auto require_timestamp_timeuuid_or_date = [&e] (const sstring& fct) {
|
||||
cql_func_require_nofail(e, fct, "t");
|
||||
cql_func_require_throw<exceptions::server_exception>(e, fct, "l");
|
||||
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "f");
|
||||
cql_func_require_nofail(e, fct, "u");
|
||||
cql_func_require_nofail(e, fct, "d");
|
||||
|
||||
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "currenttime()");
|
||||
cql_func_require_nofail(e, fct, "currentdate()");
|
||||
cql_func_require_nofail(e, fct, "now()");
|
||||
cql_func_require_nofail(e, fct, "currenttimeuuid()");
|
||||
cql_func_require_nofail(e, fct, "currenttimestamp()");
|
||||
};
|
||||
|
||||
require_timestamp_timeuuid_or_date("tounixtimestamp");
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_time_uuid_fcts_result) {
|
||||
return do_with_cql_env_thread([] (auto& e) {
|
||||
create_time_uuid_fcts_schema(e);
|
||||
|
||||
// test timestamp arg
|
||||
auto require_timestamp = [&e] (const sstring& fct) {
|
||||
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "mintimeuuid(t)");
|
||||
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "maxtimeuuid(t)");
|
||||
cql_func_require_nofail(e, fct, "dateof(u)");
|
||||
cql_func_require_nofail(e, fct, "unixtimestampof(u)");
|
||||
cql_func_require_nofail(e, fct, "totimestamp(u)");
|
||||
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "todate(u)");
|
||||
cql_func_require_nofail(e, fct, "tounixtimestamp(u)");
|
||||
};
|
||||
|
||||
require_timestamp("mintimeuuid");
|
||||
require_timestamp("maxtimeuuid");
|
||||
|
||||
// test timeuuid arg
|
||||
auto require_timeuuid = [&e] (const sstring& fct) {
|
||||
cql_func_require_nofail(e, fct, "mintimeuuid(t)");
|
||||
cql_func_require_nofail(e, fct, "maxtimeuuid(t)");
|
||||
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "dateof(u)");
|
||||
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "unixtimestampof(u)");
|
||||
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "totimestamp(u)");
|
||||
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "todate(u)");
|
||||
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "tounixtimestamp(u)");
|
||||
};
|
||||
|
||||
require_timeuuid("dateof");
|
||||
require_timeuuid("unixtimestampof");
|
||||
|
||||
// test timeuuid or date arg
|
||||
auto require_timeuuid_or_date = [&e] (const sstring& fct) {
|
||||
cql_func_require_nofail(e, fct, "mintimeuuid(t)");
|
||||
cql_func_require_nofail(e, fct, "maxtimeuuid(t)");
|
||||
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "dateof(u)");
|
||||
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "unixtimestampof(u)");
|
||||
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "totimestamp(u)");
|
||||
cql_func_require_nofail(e, fct, "todate(u)");
|
||||
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "tounixtimestamp(u)");
|
||||
};
|
||||
|
||||
require_timeuuid_or_date("totimestamp");
|
||||
|
||||
// test timestamp or timeuuid arg
|
||||
auto require_timestamp_or_timeuuid = [&e] (const sstring& fct) {
|
||||
};
|
||||
|
||||
require_timestamp_or_timeuuid("todate");
|
||||
|
||||
// test timestamp, timeuuid, or date arg
|
||||
auto require_timestamp_timeuuid_or_date = [&e] (const sstring& fct) {
|
||||
cql_func_require_nofail(e, fct, "mintimeuuid(t)");
|
||||
cql_func_require_nofail(e, fct, "maxtimeuuid(t)");
|
||||
cql_func_require_nofail(e, fct, "dateof(u)");
|
||||
cql_func_require_nofail(e, fct, "unixtimestampof(u)");
|
||||
cql_func_require_nofail(e, fct, "totimestamp(u)");
|
||||
cql_func_require_nofail(e, fct, "todate(u)");
|
||||
cql_func_require_nofail(e, fct, "tounixtimestamp(u)");
|
||||
};
|
||||
|
||||
require_timestamp_timeuuid_or_date("tounixtimestamp");
|
||||
});
|
||||
}
|
||||
|
||||
@@ -25,7 +25,7 @@
|
||||
#include <seastar/util/noncopyable_function.hh>
|
||||
|
||||
inline
|
||||
void eventually(noncopyable_function<void ()> f, size_t max_attempts = 12) {
|
||||
void eventually(noncopyable_function<void ()> f, size_t max_attempts = 17) {
|
||||
size_t attempts = 0;
|
||||
while (true) {
|
||||
try {
|
||||
@@ -43,7 +43,7 @@ void eventually(noncopyable_function<void ()> f, size_t max_attempts = 12) {
|
||||
|
||||
inline
|
||||
bool eventually_true(noncopyable_function<bool ()> f) {
|
||||
const unsigned max_attempts = 10;
|
||||
const unsigned max_attempts = 15;
|
||||
unsigned attempts = 0;
|
||||
while (true) {
|
||||
if (f()) {
|
||||
|
||||
@@ -118,6 +118,53 @@ SEASTAR_TEST_CASE(test_multishard_writer) {
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_multishard_writer_producer_aborts) {
|
||||
return do_with_cql_env_thread([] (cql_test_env& e) {
|
||||
auto test_random_streams = [] (random_mutation_generator&& gen, size_t partition_nr, generate_error error = generate_error::no) {
|
||||
auto muts = gen(partition_nr);
|
||||
schema_ptr s = gen.schema();
|
||||
auto source_reader = partition_nr > 0 ? flat_mutation_reader_from_mutations(muts) : make_empty_flat_reader(s);
|
||||
int mf_produced = 0;
|
||||
auto get_next_mutation_fragment = [&source_reader, &mf_produced] () mutable {
|
||||
if (mf_produced++ > 800) {
|
||||
return make_exception_future<mutation_fragment_opt>(std::runtime_error("the producer failed"));
|
||||
} else {
|
||||
return source_reader(db::no_timeout);
|
||||
}
|
||||
};
|
||||
auto& partitioner = dht::global_partitioner();
|
||||
try {
|
||||
distribute_reader_and_consume_on_shards(s, partitioner,
|
||||
make_generating_reader(s, std::move(get_next_mutation_fragment)),
|
||||
[&partitioner, error] (flat_mutation_reader reader) mutable {
|
||||
if (error) {
|
||||
return make_exception_future<>(std::runtime_error("Failed to write"));
|
||||
}
|
||||
return repeat([&partitioner, reader = std::move(reader), error] () mutable {
|
||||
return reader(db::no_timeout).then([&partitioner, error] (mutation_fragment_opt mf_opt) mutable {
|
||||
if (mf_opt) {
|
||||
if (mf_opt->is_partition_start()) {
|
||||
auto shard = partitioner.shard_of(mf_opt->as_partition_start().key().token());
|
||||
BOOST_REQUIRE_EQUAL(shard, this_shard_id());
|
||||
}
|
||||
return make_ready_future<stop_iteration>(stop_iteration::no);
|
||||
} else {
|
||||
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
).get0();
|
||||
} catch (...) {
|
||||
// The distribute_reader_and_consume_on_shards is expected to fail and not block forever
|
||||
}
|
||||
};
|
||||
|
||||
test_random_streams(random_mutation_generator(random_mutation_generator::generate_counters::no, local_shard_only::yes), 1000, generate_error::no);
|
||||
test_random_streams(random_mutation_generator(random_mutation_generator::generate_counters::no, local_shard_only::yes), 1000, generate_error::yes);
|
||||
});
|
||||
}
|
||||
|
||||
namespace {
|
||||
|
||||
class bucket_writer {
|
||||
|
||||
@@ -371,10 +371,8 @@ SEASTAR_TEST_CASE(test_merging_does_not_alter_tables_which_didnt_change) {
|
||||
muts2.push_back(db::schema_tables::make_scylla_tables_mutation(s0, api::new_timestamp()));
|
||||
mm.announce(muts2).get();
|
||||
|
||||
// SCYLLA_TABLES have additional columns so announcing its mutation
|
||||
// changes the tables
|
||||
BOOST_REQUIRE(s1 != find_table().schema());
|
||||
BOOST_REQUIRE(legacy_version != find_table().schema()->version());
|
||||
BOOST_REQUIRE(s1 == find_table().schema());
|
||||
BOOST_REQUIRE_EQUAL(legacy_version, find_table().schema()->version());
|
||||
});
|
||||
});
|
||||
}
|
||||
@@ -575,7 +573,7 @@ SEASTAR_TEST_CASE(test_prepared_statement_is_invalidated_by_schema_change) {
|
||||
|
||||
// We don't want schema digest to change between Scylla versions because that results in a schema disagreement
|
||||
// during rolling upgrade.
|
||||
future<> test_schema_digest_does_not_change_with_disabled_features(sstring data_dir, std::set<sstring> disabled_features, std::vector<utils::UUID> expected_digests) {
|
||||
future<> test_schema_digest_does_not_change_with_disabled_features(sstring data_dir, std::set<sstring> disabled_features, std::vector<utils::UUID> expected_digests, std::function<void(cql_test_env& e)> extra_schema_changes) {
|
||||
using namespace db;
|
||||
using namespace db::schema_tables;
|
||||
|
||||
@@ -588,7 +586,6 @@ future<> test_schema_digest_does_not_change_with_disabled_features(sstring data_
|
||||
|
||||
auto db_cfg_ptr = make_shared<db::config>();
|
||||
auto& db_cfg = *db_cfg_ptr;
|
||||
db_cfg.experimental_features({experimental_features_t::UDF}, db::config::config_source::CommandLine);
|
||||
if (regenerate) {
|
||||
db_cfg.data_file_directories({data_dir}, db::config::config_source::CommandLine);
|
||||
} else {
|
||||
@@ -598,7 +595,7 @@ future<> test_schema_digest_does_not_change_with_disabled_features(sstring data_
|
||||
cql_test_config cfg_in(db_cfg_ptr);
|
||||
cfg_in.disabled_features = std::move(disabled_features);
|
||||
|
||||
return do_with_cql_env_thread([regenerate, expected_digests = std::move(expected_digests)](cql_test_env& e) {
|
||||
return do_with_cql_env_thread([regenerate, expected_digests = std::move(expected_digests), extra_schema_changes = std::move(extra_schema_changes)] (cql_test_env& e) {
|
||||
if (regenerate) {
|
||||
// Exercise many different kinds of schema changes.
|
||||
e.execute_cql(
|
||||
@@ -614,6 +611,7 @@ future<> test_schema_digest_does_not_change_with_disabled_features(sstring data_
|
||||
e.execute_cql(
|
||||
"create keyspace tests2 with replication = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };").get();
|
||||
e.execute_cql("drop keyspace tests2;").get();
|
||||
extra_schema_changes(e);
|
||||
}
|
||||
|
||||
auto expect_digest = [&] (schema_features sf, utils::UUID expected) {
|
||||
@@ -674,7 +672,7 @@ SEASTAR_TEST_CASE(test_schema_digest_does_not_change) {
|
||||
utils::UUID("1d91ad22-ea7c-3e7f-9557-87f0f3bb94d7"),
|
||||
utils::UUID("2dcd4a37-cbb5-399b-b3c9-8eb1398b096b")
|
||||
};
|
||||
return test_schema_digest_does_not_change_with_disabled_features("./tests/sstables/schema_digest_test", std::set<sstring>{"COMPUTED_COLUMNS"}, std::move(expected_digests));
|
||||
return test_schema_digest_does_not_change_with_disabled_features("./tests/sstables/schema_digest_test", std::set<sstring>{"COMPUTED_COLUMNS", "CDC"}, std::move(expected_digests), [] (cql_test_env& e) {});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_schema_digest_does_not_change_after_computed_columns) {
|
||||
@@ -689,5 +687,26 @@ SEASTAR_TEST_CASE(test_schema_digest_does_not_change_after_computed_columns) {
|
||||
utils::UUID("d58e5214-516e-3d0b-95b5-01ab71584a8d"),
|
||||
utils::UUID("e1b50bed-2ab8-3759-92c7-1f4288046ae6")
|
||||
};
|
||||
return test_schema_digest_does_not_change_with_disabled_features("./tests/sstables/schema_digest_test_computed_columns", std::set<sstring>{}, std::move(expected_digests));
|
||||
return test_schema_digest_does_not_change_with_disabled_features("./tests/sstables/schema_digest_test_computed_columns", std::set<sstring>{"CDC"}, std::move(expected_digests), [] (cql_test_env& e) {});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_schema_digest_does_not_change_with_cdc_options) {
|
||||
std::vector<utils::UUID> expected_digests{
|
||||
utils::UUID("a1f07f31-59d6-372a-8c94-7ea467354b39"),
|
||||
utils::UUID("524d418d-a2e2-3fc3-bf45-5fb79b33c7e4"),
|
||||
utils::UUID("524d418d-a2e2-3fc3-bf45-5fb79b33c7e4"),
|
||||
utils::UUID("018fccba-8050-3bb9-a0a5-2b3c5f0371fe"),
|
||||
utils::UUID("018fccba-8050-3bb9-a0a5-2b3c5f0371fe"),
|
||||
utils::UUID("58f4254e-cc3b-3d56-8a45-167f9a3ea423"),
|
||||
utils::UUID("48fda4f8-d7b5-3e59-a47a-7397989a9bf8"),
|
||||
utils::UUID("8049bcfe-eb01-3a59-af33-16cef8a34b45"),
|
||||
utils::UUID("2195a821-b2b8-3cb8-a179-2f5042e90841")
|
||||
};
|
||||
return test_schema_digest_does_not_change_with_disabled_features(
|
||||
"./tests/sstables/schema_digest_test_cdc_options",
|
||||
std::set<sstring>{},
|
||||
std::move(expected_digests),
|
||||
[] (cql_test_env& e) {
|
||||
e.execute_cql("create table tests.table_cdc (pk int primary key, c1 int, c2 int) with cdc = {'enabled':'true'};").get();
|
||||
});
|
||||
}
|
||||
|
||||
@@ -5262,3 +5262,131 @@ SEASTAR_THREAD_TEST_CASE(test_sstable_log_too_many_rows) {
|
||||
test_sstable_log_too_many_rows_f(random, (random + 1), false);
|
||||
test_sstable_log_too_many_rows_f((random + 1), random, true);
|
||||
}
|
||||
|
||||
// The following test runs on tests/sstables/3.x/uncompressed/legacy_udt_in_collection
|
||||
// It was created using Scylla 3.0.x using the following CQL statements:
|
||||
//
|
||||
// CREATE KEYSPACE ks WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};
|
||||
// CREATE TYPE ks.ut (a int, b int);
|
||||
// CREATE TABLE ks.t ( pk int PRIMARY KEY,
|
||||
// m map<int, frozen<ut>>,
|
||||
// fm frozen<map<int, frozen<ut>>>,
|
||||
// mm map<int, frozen<map<int, frozen<ut>>>>,
|
||||
// fmm frozen<map<int, frozen<map<int, frozen<ut>>>>>,
|
||||
// s set<frozen<ut>>,
|
||||
// fs frozen<set<frozen<ut>>>,
|
||||
// l list<frozen<ut>>,
|
||||
// fl frozen<list<frozen<ut>>>
|
||||
// ) WITH compression = {};
|
||||
// UPDATE ks.t USING TIMESTAMP 1525385507816568 SET
|
||||
// m[0] = {a: 0, b: 0},
|
||||
// fm = {0: {a: 0, b: 0}},
|
||||
// mm[0] = {0: {a: 0, b: 0}},
|
||||
// fmm = {0: {0: {a: 0, b: 0}}},
|
||||
// s = s + {{a: 0, b: 0}},
|
||||
// fs = {{a: 0, b: 0}},
|
||||
// l[scylla_timeuuid_list_index(7fb27e80-7b12-11ea-9fad-f4d108a9e4a3)] = {a: 0, b: 0},
|
||||
// fl = [{a: 0, b: 0}]
|
||||
// WHERE pk = 0;
|
||||
//
|
||||
// It checks whether a SSTable containing UDTs nested in collections, which contains incorrect serialization headers
|
||||
// (doesn't wrap nested UDTs in the FrozenType<...> tag) can be loaded by new versions of Scylla.
|
||||
|
||||
static const sstring LEGACY_UDT_IN_COLLECTION_PATH =
|
||||
"tests/sstables/3.x/uncompressed/legacy_udt_in_collection";
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_legacy_udt_in_collection_table) {
|
||||
auto abj = defer([] { await_background_jobs().get(); });
|
||||
|
||||
auto ut = user_type_impl::get_instance("ks", to_bytes("ut"),
|
||||
{to_bytes("a"), to_bytes("b")},
|
||||
{int32_type, int32_type}, false);
|
||||
auto m_type = map_type_impl::get_instance(int32_type, ut, true);
|
||||
auto fm_type = map_type_impl::get_instance(int32_type, ut, false);
|
||||
auto mm_type = map_type_impl::get_instance(int32_type, fm_type, true);
|
||||
auto fmm_type = map_type_impl::get_instance(int32_type, fm_type, false);
|
||||
auto s_type = set_type_impl::get_instance(ut, true);
|
||||
auto fs_type = set_type_impl::get_instance(ut, false);
|
||||
auto l_type = list_type_impl::get_instance(ut, true);
|
||||
auto fl_type = list_type_impl::get_instance(ut, false);
|
||||
|
||||
auto s = schema_builder("ks", "t")
|
||||
.with_column("pk", int32_type, column_kind::partition_key)
|
||||
.with_column("m", m_type)
|
||||
.with_column("fm", fm_type)
|
||||
.with_column("mm", mm_type)
|
||||
.with_column("fmm", fmm_type)
|
||||
.with_column("s", s_type)
|
||||
.with_column("fs", fs_type)
|
||||
.with_column("l", l_type)
|
||||
.with_column("fl", fl_type)
|
||||
.set_compressor_params(compression_parameters::no_compression())
|
||||
.build();
|
||||
|
||||
auto m_cdef = s->get_column_definition(to_bytes("m"));
|
||||
auto fm_cdef = s->get_column_definition(to_bytes("fm"));
|
||||
auto mm_cdef = s->get_column_definition(to_bytes("mm"));
|
||||
auto fmm_cdef = s->get_column_definition(to_bytes("fmm"));
|
||||
auto s_cdef = s->get_column_definition(to_bytes("s"));
|
||||
auto fs_cdef = s->get_column_definition(to_bytes("fs"));
|
||||
auto l_cdef = s->get_column_definition(to_bytes("l"));
|
||||
auto fl_cdef = s->get_column_definition(to_bytes("fl"));
|
||||
BOOST_REQUIRE(m_cdef && fm_cdef && mm_cdef && fmm_cdef && s_cdef && fs_cdef && l_cdef && fl_cdef);
|
||||
|
||||
auto ut_val = make_user_value(ut, {int32_t(0), int32_t(0)});
|
||||
auto fm_val = make_map_value(fm_type, {{int32_t(0), ut_val}});
|
||||
auto fmm_val = make_map_value(fmm_type, {{int32_t(0), fm_val}});
|
||||
auto fs_val = make_set_value(fs_type, {ut_val});
|
||||
auto fl_val = make_list_value(fl_type, {ut_val});
|
||||
|
||||
mutation mut{s, partition_key::from_deeply_exploded(*s, {0})};
|
||||
auto ckey = clustering_key::make_empty();
|
||||
|
||||
// m[0] = {a: 0, b: 0}
|
||||
{
|
||||
collection_mutation_description desc;
|
||||
desc.cells.emplace_back(int32_type->decompose(0),
|
||||
atomic_cell::make_live(*ut, write_timestamp, ut->decompose(ut_val), atomic_cell::collection_member::yes));
|
||||
mut.set_clustered_cell(ckey, *m_cdef, desc.serialize(*m_type));
|
||||
}
|
||||
|
||||
// fm = {0: {a: 0, b: 0}}
|
||||
mut.set_clustered_cell(ckey, *fm_cdef, atomic_cell::make_live(*fm_type, write_timestamp, fm_type->decompose(fm_val)));
|
||||
|
||||
// mm[0] = {0: {a: 0, b: 0}},
|
||||
{
|
||||
collection_mutation_description desc;
|
||||
desc.cells.emplace_back(int32_type->decompose(0),
|
||||
atomic_cell::make_live(*fm_type, write_timestamp, fm_type->decompose(fm_val), atomic_cell::collection_member::yes));
|
||||
mut.set_clustered_cell(ckey, *mm_cdef, desc.serialize(*mm_type));
|
||||
}
|
||||
|
||||
// fmm = {0: {0: {a: 0, b: 0}}},
|
||||
mut.set_clustered_cell(ckey, *fmm_cdef, atomic_cell::make_live(*fmm_type, write_timestamp, fmm_type->decompose(fmm_val)));
|
||||
|
||||
// s = s + {{a: 0, b: 0}},
|
||||
{
|
||||
collection_mutation_description desc;
|
||||
desc.cells.emplace_back(ut->decompose(ut_val),
|
||||
atomic_cell::make_live(*bytes_type, write_timestamp, bytes{}, atomic_cell::collection_member::yes));
|
||||
mut.set_clustered_cell(ckey, *s_cdef, desc.serialize(*s_type));
|
||||
}
|
||||
|
||||
// fs = {{a: 0, b: 0}},
|
||||
mut.set_clustered_cell(ckey, *fs_cdef, atomic_cell::make_live(*fs_type, write_timestamp, fs_type->decompose(fs_val)));
|
||||
|
||||
// l[scylla_timeuuid_list_index(7fb27e80-7b12-11ea-9fad-f4d108a9e4a3)] = {a: 0, b: 0},
|
||||
{
|
||||
collection_mutation_description desc;
|
||||
desc.cells.emplace_back(timeuuid_type->decompose(utils::UUID("7fb27e80-7b12-11ea-9fad-f4d108a9e4a3")),
|
||||
atomic_cell::make_live(*ut, write_timestamp, ut->decompose(ut_val), atomic_cell::collection_member::yes));
|
||||
mut.set_clustered_cell(ckey, *l_cdef, desc.serialize(*l_type));
|
||||
}
|
||||
|
||||
// fl = [{a: 0, b: 0}]
|
||||
mut.set_clustered_cell(ckey, *fl_cdef, atomic_cell::make_live(*fl_type, write_timestamp, fl_type->decompose(fl_val)));
|
||||
|
||||
sstable_assertions sst(s, LEGACY_UDT_IN_COLLECTION_PATH);
|
||||
sst.load();
|
||||
assert_that(sst.read_rows_flat()).produces(mut).produces_end_of_stream();
|
||||
}
|
||||
|
||||
Binary file not shown.
Binary file not shown.
@@ -0,0 +1 @@
|
||||
3519784297
|
||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
@@ -0,0 +1,9 @@
|
||||
Scylla.db
|
||||
CRC.db
|
||||
Filter.db
|
||||
Statistics.db
|
||||
TOC.txt
|
||||
Digest.crc32
|
||||
Index.db
|
||||
Summary.db
|
||||
Data.db
|
||||
Binary file not shown.
Binary file not shown.
@@ -0,0 +1 @@
|
||||
547038858
|
||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
@@ -0,0 +1,9 @@
|
||||
CompressionInfo.db
|
||||
Filter.db
|
||||
Data.db
|
||||
Statistics.db
|
||||
TOC.txt
|
||||
Digest.crc32
|
||||
Scylla.db
|
||||
Index.db
|
||||
Summary.db
|
||||
Binary file not shown.
Binary file not shown.
@@ -0,0 +1 @@
|
||||
579241509
|
||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
@@ -0,0 +1,9 @@
|
||||
CompressionInfo.db
|
||||
Filter.db
|
||||
Data.db
|
||||
Statistics.db
|
||||
TOC.txt
|
||||
Digest.crc32
|
||||
Scylla.db
|
||||
Index.db
|
||||
Summary.db
|
||||
Binary file not shown.
Binary file not shown.
@@ -0,0 +1 @@
|
||||
2172984605
|
||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
@@ -0,0 +1,9 @@
|
||||
CompressionInfo.db
|
||||
Filter.db
|
||||
Data.db
|
||||
Statistics.db
|
||||
TOC.txt
|
||||
Digest.crc32
|
||||
Scylla.db
|
||||
Index.db
|
||||
Summary.db
|
||||
Binary file not shown.
Binary file not shown.
@@ -0,0 +1 @@
|
||||
2014320564
|
||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user