mirror of
https://github.com/scylladb/scylladb.git
synced 2026-05-13 03:12:13 +00:00
Compare commits
64 Commits
scylla-3.1
...
next-3.1
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
9b46b9f1a8 | ||
|
|
93da2e2ff0 | ||
|
|
b764db3f1c | ||
|
|
304d339193 | ||
|
|
9f7ba4203d | ||
|
|
8b6a792f81 | ||
|
|
8a94f6b180 | ||
|
|
27209a5b2e | ||
|
|
c25f627a6e | ||
|
|
58b1bdc20c | ||
|
|
e1a7df174c | ||
|
|
6c39e17838 | ||
|
|
507d763f45 | ||
|
|
b042e27f0a | ||
|
|
375ce345a3 | ||
|
|
493b821dfa | ||
|
|
e1999c76b2 | ||
|
|
4791b726a0 | ||
|
|
d7354a5b8d | ||
|
|
6815b72b06 | ||
|
|
efc2df8ca3 | ||
|
|
dbe90f131f | ||
|
|
31d5d16c3d | ||
|
|
b0d122f9c5 | ||
|
|
9a10e4a245 | ||
|
|
871d1ebdd5 | ||
|
|
bff996959d | ||
|
|
1bdc83540b | ||
|
|
478c35e07a | ||
|
|
ba968ab9ec | ||
|
|
883b5e8395 | ||
|
|
b47033676a | ||
|
|
67e45b73f0 | ||
|
|
37eac75b6f | ||
|
|
e8431a3474 | ||
|
|
9d78d848e6 | ||
|
|
32aa6ddd7e | ||
|
|
74cc9477af | ||
|
|
95acf71680 | ||
|
|
921f8baf00 | ||
|
|
071d7d9210 | ||
|
|
769b9bbe59 | ||
|
|
d4e553c153 | ||
|
|
d983411488 | ||
|
|
27de1bb8e6 | ||
|
|
854f8ccb40 | ||
|
|
a68170c9a3 | ||
|
|
7e4bcf2c0f | ||
|
|
a74b3a182e | ||
|
|
e9bc579565 | ||
|
|
ad46bf06a7 | ||
|
|
1ff21a28b7 | ||
|
|
fb3dfaa736 | ||
|
|
5a02e6976f | ||
|
|
5202eea7a7 | ||
|
|
038733f1a5 | ||
|
|
0ed2e90925 | ||
|
|
9ee6d2bc15 | ||
|
|
23582a2ce9 | ||
|
|
5ddf0ec1df | ||
|
|
e6eb54af90 | ||
|
|
f5a869966a | ||
|
|
0c70cd626b | ||
|
|
0928aa4791 |
@@ -1,7 +1,7 @@
|
||||
#!/bin/sh
|
||||
|
||||
PRODUCT=scylla
|
||||
VERSION=3.1.1
|
||||
VERSION=3.1.4
|
||||
|
||||
if test -f version
|
||||
then
|
||||
|
||||
@@ -61,6 +61,16 @@ make_now_fct() {
|
||||
});
|
||||
}
|
||||
|
||||
static int64_t get_valid_timestamp(const data_value& ts_obj) {
|
||||
auto ts = value_cast<db_clock::time_point>(ts_obj);
|
||||
int64_t ms = ts.time_since_epoch().count();
|
||||
auto nanos_since = utils::UUID_gen::make_nanos_since(ms);
|
||||
if (!utils::UUID_gen::is_valid_nanos_since(nanos_since)) {
|
||||
throw exceptions::server_exception(format("{}: timestamp is out of range. Must be in milliseconds since epoch", ms));
|
||||
}
|
||||
return ms;
|
||||
}
|
||||
|
||||
inline
|
||||
shared_ptr<function>
|
||||
make_min_timeuuid_fct() {
|
||||
@@ -74,8 +84,7 @@ make_min_timeuuid_fct() {
|
||||
if (ts_obj.is_null()) {
|
||||
return {};
|
||||
}
|
||||
auto ts = value_cast<db_clock::time_point>(ts_obj);
|
||||
auto uuid = utils::UUID_gen::min_time_UUID(ts.time_since_epoch().count());
|
||||
auto uuid = utils::UUID_gen::min_time_UUID(get_valid_timestamp(ts_obj));
|
||||
return {timeuuid_type->decompose(uuid)};
|
||||
});
|
||||
}
|
||||
@@ -85,7 +94,6 @@ shared_ptr<function>
|
||||
make_max_timeuuid_fct() {
|
||||
return make_native_scalar_function<true>("maxtimeuuid", timeuuid_type, { timestamp_type },
|
||||
[] (cql_serialization_format sf, const std::vector<bytes_opt>& values) -> bytes_opt {
|
||||
// FIXME: should values be a vector<optional<bytes>>?
|
||||
auto& bb = values[0];
|
||||
if (!bb) {
|
||||
return {};
|
||||
@@ -94,12 +102,22 @@ make_max_timeuuid_fct() {
|
||||
if (ts_obj.is_null()) {
|
||||
return {};
|
||||
}
|
||||
auto ts = value_cast<db_clock::time_point>(ts_obj);
|
||||
auto uuid = utils::UUID_gen::max_time_UUID(ts.time_since_epoch().count());
|
||||
auto uuid = utils::UUID_gen::max_time_UUID(get_valid_timestamp(ts_obj));
|
||||
return {timeuuid_type->decompose(uuid)};
|
||||
});
|
||||
}
|
||||
|
||||
inline utils::UUID get_valid_timeuuid(bytes raw) {
|
||||
if (!utils::UUID_gen::is_valid_UUID(raw)) {
|
||||
throw exceptions::server_exception(format("invalid timeuuid: size={}", raw.size()));
|
||||
}
|
||||
auto uuid = utils::UUID_gen::get_UUID(raw);
|
||||
if (!uuid.is_timestamp()) {
|
||||
throw exceptions::server_exception(format("{}: Not a timeuuid: version={}", uuid, uuid.version()));
|
||||
}
|
||||
return uuid;
|
||||
}
|
||||
|
||||
inline
|
||||
shared_ptr<function>
|
||||
make_date_of_fct() {
|
||||
@@ -110,7 +128,7 @@ make_date_of_fct() {
|
||||
if (!bb) {
|
||||
return {};
|
||||
}
|
||||
auto ts = db_clock::time_point(db_clock::duration(UUID_gen::unix_timestamp(UUID_gen::get_UUID(*bb))));
|
||||
auto ts = db_clock::time_point(db_clock::duration(UUID_gen::unix_timestamp(get_valid_timeuuid(*bb))));
|
||||
return {timestamp_type->decompose(ts)};
|
||||
});
|
||||
}
|
||||
@@ -125,7 +143,7 @@ make_unix_timestamp_of_fct() {
|
||||
if (!bb) {
|
||||
return {};
|
||||
}
|
||||
return {long_type->decompose(UUID_gen::unix_timestamp(UUID_gen::get_UUID(*bb)))};
|
||||
return {long_type->decompose(UUID_gen::unix_timestamp(get_valid_timeuuid(*bb)))};
|
||||
});
|
||||
}
|
||||
|
||||
@@ -176,7 +194,7 @@ make_timeuuidtodate_fct() {
|
||||
if (!bb) {
|
||||
return {};
|
||||
}
|
||||
auto ts = db_clock::time_point(db_clock::duration(UUID_gen::unix_timestamp(UUID_gen::get_UUID(*bb))));
|
||||
auto ts = db_clock::time_point(db_clock::duration(UUID_gen::unix_timestamp(get_valid_timeuuid(*bb))));
|
||||
auto to_simple_date = get_castas_fctn(simple_date_type, timestamp_type);
|
||||
return {simple_date_type->decompose(to_simple_date(ts))};
|
||||
});
|
||||
@@ -211,7 +229,7 @@ make_timeuuidtotimestamp_fct() {
|
||||
if (!bb) {
|
||||
return {};
|
||||
}
|
||||
auto ts = db_clock::time_point(db_clock::duration(UUID_gen::unix_timestamp(UUID_gen::get_UUID(*bb))));
|
||||
auto ts = db_clock::time_point(db_clock::duration(UUID_gen::unix_timestamp(get_valid_timeuuid(*bb))));
|
||||
return {timestamp_type->decompose(ts)};
|
||||
});
|
||||
}
|
||||
@@ -245,10 +263,14 @@ make_timeuuidtounixtimestamp_fct() {
|
||||
if (!bb) {
|
||||
return {};
|
||||
}
|
||||
return {long_type->decompose(UUID_gen::unix_timestamp(UUID_gen::get_UUID(*bb)))};
|
||||
return {long_type->decompose(UUID_gen::unix_timestamp(get_valid_timeuuid(*bb)))};
|
||||
});
|
||||
}
|
||||
|
||||
inline bytes time_point_to_long(const data_value& v) {
|
||||
return data_value(get_valid_timestamp(v)).serialize();
|
||||
}
|
||||
|
||||
inline
|
||||
shared_ptr<function>
|
||||
make_timestamptounixtimestamp_fct() {
|
||||
@@ -263,7 +285,7 @@ make_timestamptounixtimestamp_fct() {
|
||||
if (ts_obj.is_null()) {
|
||||
return {};
|
||||
}
|
||||
return {long_type->decompose(ts_obj)};
|
||||
return time_point_to_long(ts_obj);
|
||||
});
|
||||
}
|
||||
|
||||
@@ -282,7 +304,7 @@ make_datetounixtimestamp_fct() {
|
||||
return {};
|
||||
}
|
||||
auto from_simple_date = get_castas_fctn(timestamp_type, simple_date_type);
|
||||
return {long_type->decompose(from_simple_date(simple_date_obj))};
|
||||
return time_point_to_long(from_simple_date(simple_date_obj));
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -130,6 +130,18 @@ query_options::query_options(std::unique_ptr<query_options> qo, ::shared_ptr<ser
|
||||
|
||||
}
|
||||
|
||||
query_options::query_options(std::unique_ptr<query_options> qo, ::shared_ptr<service::pager::paging_state> paging_state, int32_t page_size)
|
||||
: query_options(qo->_consistency,
|
||||
qo->get_timeout_config(),
|
||||
std::move(qo->_names),
|
||||
std::move(qo->_values),
|
||||
std::move(qo->_value_views),
|
||||
qo->_skip_metadata,
|
||||
std::move(query_options::specific_options{page_size, paging_state, qo->_options.serial_consistency, qo->_options.timestamp}),
|
||||
qo->_cql_serialization_format) {
|
||||
|
||||
}
|
||||
|
||||
query_options::query_options(std::vector<cql3::raw_value> values)
|
||||
: query_options(
|
||||
db::consistency_level::ONE, infinite_timeout_config, std::move(values))
|
||||
|
||||
@@ -102,7 +102,7 @@ private:
|
||||
|
||||
public:
|
||||
query_options(query_options&&) = default;
|
||||
query_options(const query_options&) = delete;
|
||||
explicit query_options(const query_options&) = default;
|
||||
|
||||
explicit query_options(db::consistency_level consistency,
|
||||
const timeout_config& timeouts,
|
||||
@@ -155,6 +155,7 @@ public:
|
||||
explicit query_options(db::consistency_level, const timeout_config& timeouts,
|
||||
std::vector<cql3::raw_value> values, specific_options options = specific_options::DEFAULT);
|
||||
explicit query_options(std::unique_ptr<query_options>, ::shared_ptr<service::pager::paging_state> paging_state);
|
||||
explicit query_options(std::unique_ptr<query_options>, ::shared_ptr<service::pager::paging_state> paging_state, int32_t page_size);
|
||||
|
||||
const timeout_config& get_timeout_config() const { return _timeout_config; }
|
||||
|
||||
|
||||
@@ -380,28 +380,45 @@ std::vector<const column_definition*> statement_restrictions::get_column_defs_fo
|
||||
if (need_filtering()) {
|
||||
auto& sim = db.find_column_family(_schema).get_index_manager();
|
||||
auto [opt_idx, _] = find_idx(sim);
|
||||
auto column_uses_indexing = [&opt_idx] (const column_definition* cdef) {
|
||||
return opt_idx && opt_idx->depends_on(*cdef);
|
||||
auto column_uses_indexing = [&opt_idx] (const column_definition* cdef, ::shared_ptr<single_column_restriction> restr) {
|
||||
return opt_idx && restr && restr->is_supported_by(*opt_idx);
|
||||
};
|
||||
auto single_pk_restrs = dynamic_pointer_cast<single_column_partition_key_restrictions>(_partition_key_restrictions);
|
||||
if (_partition_key_restrictions->needs_filtering(*_schema)) {
|
||||
for (auto&& cdef : _partition_key_restrictions->get_column_defs()) {
|
||||
if (!column_uses_indexing(cdef)) {
|
||||
::shared_ptr<single_column_restriction> restr;
|
||||
if (single_pk_restrs) {
|
||||
auto it = single_pk_restrs->restrictions().find(cdef);
|
||||
if (it != single_pk_restrs->restrictions().end()) {
|
||||
restr = dynamic_pointer_cast<single_column_restriction>(it->second);
|
||||
}
|
||||
}
|
||||
if (!column_uses_indexing(cdef, restr)) {
|
||||
column_defs_for_filtering.emplace_back(cdef);
|
||||
}
|
||||
}
|
||||
}
|
||||
auto single_ck_restrs = dynamic_pointer_cast<single_column_clustering_key_restrictions>(_clustering_columns_restrictions);
|
||||
const bool pk_has_unrestricted_components = _partition_key_restrictions->has_unrestricted_components(*_schema);
|
||||
if (pk_has_unrestricted_components || _clustering_columns_restrictions->needs_filtering(*_schema)) {
|
||||
column_id first_filtering_id = pk_has_unrestricted_components ? 0 : _schema->clustering_key_columns().begin()->id +
|
||||
_clustering_columns_restrictions->num_prefix_columns_that_need_not_be_filtered();
|
||||
for (auto&& cdef : _clustering_columns_restrictions->get_column_defs()) {
|
||||
if (cdef->id >= first_filtering_id && !column_uses_indexing(cdef)) {
|
||||
::shared_ptr<single_column_restriction> restr;
|
||||
if (single_pk_restrs) {
|
||||
auto it = single_ck_restrs->restrictions().find(cdef);
|
||||
if (it != single_ck_restrs->restrictions().end()) {
|
||||
restr = dynamic_pointer_cast<single_column_restriction>(it->second);
|
||||
}
|
||||
}
|
||||
if (cdef->id >= first_filtering_id && !column_uses_indexing(cdef, restr)) {
|
||||
column_defs_for_filtering.emplace_back(cdef);
|
||||
}
|
||||
}
|
||||
}
|
||||
for (auto&& cdef : _nonprimary_key_restrictions->get_column_defs()) {
|
||||
if (!column_uses_indexing(cdef)) {
|
||||
auto restr = dynamic_pointer_cast<single_column_restriction>(_nonprimary_key_restrictions->get_restriction(*cdef));
|
||||
if (!column_uses_indexing(cdef, restr)) {
|
||||
column_defs_for_filtering.emplace_back(cdef);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -92,6 +92,14 @@ public:
|
||||
: abstract_function_selector(fun, std::move(arg_selectors))
|
||||
, _tfun(dynamic_pointer_cast<T>(fun)) {
|
||||
}
|
||||
|
||||
const functions::function_name& name() const {
|
||||
return _tfun->name();
|
||||
}
|
||||
|
||||
virtual sstring assignment_testable_source_context() const override {
|
||||
return format("{}", this->name());
|
||||
}
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
@@ -79,11 +79,6 @@ public:
|
||||
dynamic_pointer_cast<functions::aggregate_function>(func), std::move(arg_selectors))
|
||||
, _aggregate(fun()->new_aggregate()) {
|
||||
}
|
||||
|
||||
virtual sstring assignment_testable_source_context() const override {
|
||||
// FIXME:
|
||||
return "FIXME";
|
||||
}
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
@@ -82,12 +82,6 @@ public:
|
||||
: abstract_function_selector_for<functions::scalar_function>(
|
||||
dynamic_pointer_cast<functions::scalar_function>(std::move(fun)), std::move(arg_selectors)) {
|
||||
}
|
||||
|
||||
virtual sstring assignment_testable_source_context() const override {
|
||||
// FIXME:
|
||||
return "FIXME";
|
||||
}
|
||||
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
@@ -142,7 +142,7 @@ shared_ptr<selector::factory>
|
||||
selectable::with_field_selection::new_selector_factory(database& db, schema_ptr s, std::vector<const column_definition*>& defs) {
|
||||
auto&& factory = _selected->new_selector_factory(db, s, defs);
|
||||
auto&& type = factory->new_instance()->get_type();
|
||||
auto&& ut = dynamic_pointer_cast<const user_type_impl>(std::move(type));
|
||||
auto&& ut = dynamic_pointer_cast<const user_type_impl>(type->underlying_type());
|
||||
if (!ut) {
|
||||
throw exceptions::invalid_request_exception(
|
||||
format("Invalid field selection: {} of type {} is not a user type",
|
||||
|
||||
@@ -166,7 +166,8 @@ alter_type_statement::add_or_alter::add_or_alter(const ut_name& name, bool is_ad
|
||||
user_type alter_type_statement::add_or_alter::do_add(database& db, user_type to_update) const
|
||||
{
|
||||
if (get_idx_of_field(to_update, _field_name)) {
|
||||
throw exceptions::invalid_request_exception(format("Cannot add new field {} to type {}: a field of the same name already exists", _field_name->name(), _name.to_string()));
|
||||
throw exceptions::invalid_request_exception(format("Cannot add new field {} to type {}: a field of the same name already exists",
|
||||
_field_name->to_string(), _name.to_string()));
|
||||
}
|
||||
|
||||
std::vector<bytes> new_names(to_update->field_names());
|
||||
@@ -174,7 +175,7 @@ user_type alter_type_statement::add_or_alter::do_add(database& db, user_type to_
|
||||
std::vector<data_type> new_types(to_update->field_types());
|
||||
auto&& add_type = _field_type->prepare(db, keyspace()).get_type();
|
||||
if (add_type->references_user_type(to_update->_keyspace, to_update->_name)) {
|
||||
throw exceptions::invalid_request_exception(format("Cannot add new field {} of type {} to type {} as this would create a circular reference", _field_name->name(), _field_type->to_string(), _name.to_string()));
|
||||
throw exceptions::invalid_request_exception(format("Cannot add new field {} of type {} to type {} as this would create a circular reference", _field_name->to_string(), _field_type->to_string(), _name.to_string()));
|
||||
}
|
||||
new_types.push_back(std::move(add_type));
|
||||
return user_type_impl::get_instance(to_update->_keyspace, to_update->_name, std::move(new_names), std::move(new_types));
|
||||
@@ -184,13 +185,14 @@ user_type alter_type_statement::add_or_alter::do_alter(database& db, user_type t
|
||||
{
|
||||
std::optional<uint32_t> idx = get_idx_of_field(to_update, _field_name);
|
||||
if (!idx) {
|
||||
throw exceptions::invalid_request_exception(format("Unknown field {} in type {}", _field_name->name(), _name.to_string()));
|
||||
throw exceptions::invalid_request_exception(format("Unknown field {} in type {}", _field_name->to_string(), _name.to_string()));
|
||||
}
|
||||
|
||||
auto previous = to_update->field_types()[*idx];
|
||||
auto new_type = _field_type->prepare(db, keyspace()).get_type();
|
||||
if (!new_type->is_compatible_with(*previous)) {
|
||||
throw exceptions::invalid_request_exception(format("Type {} in incompatible with previous type {} of field {} in user type {}", _field_type->to_string(), previous->as_cql3_type().to_string(), _field_name->name(), _name.to_string()));
|
||||
throw exceptions::invalid_request_exception(format("Type {} in incompatible with previous type {} of field {} in user type {}",
|
||||
_field_type->to_string(), previous->as_cql3_type().to_string(), _field_name->to_string(), _name.to_string()));
|
||||
}
|
||||
|
||||
std::vector<data_type> new_types(to_update->field_types());
|
||||
|
||||
@@ -440,8 +440,8 @@ indexed_table_select_statement::prepare_command_for_base_query(const query_optio
|
||||
return cmd;
|
||||
}
|
||||
|
||||
future<shared_ptr<cql_transport::messages::result_message>>
|
||||
indexed_table_select_statement::execute_base_query(
|
||||
future<foreign_ptr<lw_shared_ptr<query::result>>, lw_shared_ptr<query::read_command>>
|
||||
indexed_table_select_statement::do_execute_base_query(
|
||||
service::storage_proxy& proxy,
|
||||
dht::partition_range_vector&& partition_ranges,
|
||||
service::query_state& state,
|
||||
@@ -492,22 +492,27 @@ indexed_table_select_statement::execute_base_query(
|
||||
}).then([&merger]() {
|
||||
return merger.get();
|
||||
});
|
||||
}).then([this, &proxy, &state, &options, now, cmd, paging_state = std::move(paging_state)] (foreign_ptr<lw_shared_ptr<query::result>> result) mutable {
|
||||
return this->process_base_query_results(std::move(result), cmd, proxy, state, options, now, std::move(paging_state));
|
||||
}).then([cmd] (foreign_ptr<lw_shared_ptr<query::result>> result) mutable {
|
||||
return make_ready_future<foreign_ptr<lw_shared_ptr<query::result>>, lw_shared_ptr<query::read_command>>(std::move(result), std::move(cmd));
|
||||
});
|
||||
}
|
||||
|
||||
// Function for fetching the selected columns from a list of clustering rows.
|
||||
// It is currently used only in our Secondary Index implementation - ordinary
|
||||
// CQL SELECT statements do not have the syntax to request a list of rows.
|
||||
// FIXME: The current implementation is very inefficient - it requests each
|
||||
// row separately (and, incrementally, in parallel). Even multiple rows from a single
|
||||
// partition are requested separately. This last case can be easily improved,
|
||||
// but to implement the general case (multiple rows from multiple partitions)
|
||||
// efficiently, we will need more support from other layers.
|
||||
// Keys are ordered in token order (see #3423)
|
||||
future<shared_ptr<cql_transport::messages::result_message>>
|
||||
indexed_table_select_statement::execute_base_query(
|
||||
service::storage_proxy& proxy,
|
||||
dht::partition_range_vector&& partition_ranges,
|
||||
service::query_state& state,
|
||||
const query_options& options,
|
||||
gc_clock::time_point now,
|
||||
::shared_ptr<const service::pager::paging_state> paging_state) {
|
||||
return do_execute_base_query(proxy, std::move(partition_ranges), state, options, now, paging_state).then(
|
||||
[this, &proxy, &state, &options, now, paging_state = std::move(paging_state)] (foreign_ptr<lw_shared_ptr<query::result>> result, lw_shared_ptr<query::read_command> cmd) {
|
||||
return process_base_query_results(std::move(result), std::move(cmd), proxy, state, options, now, std::move(paging_state));
|
||||
});
|
||||
}
|
||||
|
||||
future<foreign_ptr<lw_shared_ptr<query::result>>, lw_shared_ptr<query::read_command>>
|
||||
indexed_table_select_statement::do_execute_base_query(
|
||||
service::storage_proxy& proxy,
|
||||
std::vector<primary_key>&& primary_keys,
|
||||
service::query_state& state,
|
||||
@@ -562,9 +567,23 @@ indexed_table_select_statement::execute_base_query(
|
||||
});
|
||||
}).then([&merger] () {
|
||||
return merger.get();
|
||||
}).then([cmd] (foreign_ptr<lw_shared_ptr<query::result>> result) mutable {
|
||||
return make_ready_future<foreign_ptr<lw_shared_ptr<query::result>>, lw_shared_ptr<query::read_command>>(std::move(result), std::move(cmd));
|
||||
});
|
||||
}).then([this, &proxy, &state, &options, now, cmd, paging_state = std::move(paging_state)] (foreign_ptr<lw_shared_ptr<query::result>> result) mutable {
|
||||
return this->process_base_query_results(std::move(result), cmd, proxy, state, options, now, std::move(paging_state));
|
||||
});
|
||||
}
|
||||
|
||||
future<shared_ptr<cql_transport::messages::result_message>>
|
||||
indexed_table_select_statement::execute_base_query(
|
||||
service::storage_proxy& proxy,
|
||||
std::vector<primary_key>&& primary_keys,
|
||||
service::query_state& state,
|
||||
const query_options& options,
|
||||
gc_clock::time_point now,
|
||||
::shared_ptr<const service::pager::paging_state> paging_state) {
|
||||
return do_execute_base_query(proxy, std::move(primary_keys), state, options, now, paging_state).then(
|
||||
[this, &proxy, &state, &options, now, paging_state = std::move(paging_state)] (foreign_ptr<lw_shared_ptr<query::result>> result, lw_shared_ptr<query::read_command> cmd) {
|
||||
return process_base_query_results(std::move(result), std::move(cmd), proxy, state, options, now, std::move(paging_state));
|
||||
});
|
||||
}
|
||||
|
||||
@@ -868,6 +887,60 @@ indexed_table_select_statement::do_execute(service::storage_proxy& proxy,
|
||||
}
|
||||
}
|
||||
|
||||
// Aggregated and paged filtering needs to aggregate the results from all pages
|
||||
// in order to avoid returning partial per-page results (issue #4540).
|
||||
// It's a little bit more complicated than regular aggregation, because each paging state
|
||||
// needs to be translated between the base table and the underlying view.
|
||||
// The routine below keeps fetching pages from the underlying view, which are then
|
||||
// used to fetch base rows, which go straight to the result set builder.
|
||||
// A local, internal copy of query_options is kept in order to keep updating
|
||||
// the paging state between requesting data from replicas.
|
||||
const bool aggregate = _selection->is_aggregate();
|
||||
if (aggregate) {
|
||||
const bool restrictions_need_filtering = _restrictions->need_filtering();
|
||||
return do_with(cql3::selection::result_set_builder(*_selection, now, options.get_cql_serialization_format()), std::make_unique<cql3::query_options>(cql3::query_options(options)),
|
||||
[this, &options, &proxy, &state, now, whole_partitions, partition_slices, restrictions_need_filtering] (cql3::selection::result_set_builder& builder, std::unique_ptr<cql3::query_options>& internal_options) {
|
||||
// page size is set to the internal count page size, regardless of the user-provided value
|
||||
internal_options.reset(new cql3::query_options(std::move(internal_options), options.get_paging_state(), DEFAULT_COUNT_PAGE_SIZE));
|
||||
return repeat([this, &builder, &options, &internal_options, &proxy, &state, now, whole_partitions, partition_slices, restrictions_need_filtering] () {
|
||||
auto consume_results = [this, &builder, &options, &internal_options, restrictions_need_filtering] (foreign_ptr<lw_shared_ptr<query::result>> results, lw_shared_ptr<query::read_command> cmd) {
|
||||
if (restrictions_need_filtering) {
|
||||
query::result_view::consume(*results, cmd->slice, cql3::selection::result_set_builder::visitor(builder, *_schema, *_selection,
|
||||
cql3::selection::result_set_builder::restrictions_filter(_restrictions, options, cmd->row_limit, _schema, cmd->slice.partition_row_limit())));
|
||||
} else {
|
||||
query::result_view::consume(*results, cmd->slice, cql3::selection::result_set_builder::visitor(builder, *_schema, *_selection));
|
||||
}
|
||||
};
|
||||
|
||||
if (whole_partitions || partition_slices) {
|
||||
return find_index_partition_ranges(proxy, state, *internal_options).then(
|
||||
[this, now, &state, &internal_options, &proxy, consume_results = std::move(consume_results)] (dht::partition_range_vector partition_ranges, ::shared_ptr<const service::pager::paging_state> paging_state) {
|
||||
bool has_more_pages = paging_state && paging_state->get_remaining() > 0;
|
||||
internal_options.reset(new cql3::query_options(std::move(internal_options), paging_state ? ::make_shared<service::pager::paging_state>(*paging_state) : nullptr));
|
||||
return do_execute_base_query(proxy, std::move(partition_ranges), state, *internal_options, now, std::move(paging_state)).then(consume_results).then([has_more_pages] {
|
||||
return stop_iteration(!has_more_pages);
|
||||
});
|
||||
});
|
||||
} else {
|
||||
return find_index_clustering_rows(proxy, state, *internal_options).then(
|
||||
[this, now, &state, &internal_options, &proxy, consume_results = std::move(consume_results)] (std::vector<primary_key> primary_keys, ::shared_ptr<const service::pager::paging_state> paging_state) {
|
||||
bool has_more_pages = paging_state && paging_state->get_remaining() > 0;
|
||||
internal_options.reset(new cql3::query_options(std::move(internal_options), paging_state ? ::make_shared<service::pager::paging_state>(*paging_state) : nullptr));
|
||||
return this->do_execute_base_query(proxy, std::move(primary_keys), state, *internal_options, now, std::move(paging_state)).then(consume_results).then([has_more_pages] {
|
||||
return stop_iteration(!has_more_pages);
|
||||
});
|
||||
});
|
||||
}
|
||||
}).then([this, &builder, restrictions_need_filtering] () {
|
||||
auto rs = builder.build();
|
||||
update_stats_rows_read(rs->size());
|
||||
_stats.filtered_rows_matched_total += restrictions_need_filtering ? rs->size() : 0;
|
||||
auto msg = ::make_shared<cql_transport::messages::result_message::rows>(result(std::move(rs)));
|
||||
return make_ready_future<shared_ptr<cql_transport::messages::result_message>>(std::move(msg));
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
if (whole_partitions || partition_slices) {
|
||||
// In this case, can use our normal query machinery, which retrieves
|
||||
// entire partitions or the same slice for many partitions.
|
||||
|
||||
@@ -68,8 +68,8 @@ class select_statement : public cql_statement {
|
||||
public:
|
||||
using parameters = raw::select_statement::parameters;
|
||||
using ordering_comparator_type = raw::select_statement::ordering_comparator_type;
|
||||
protected:
|
||||
static constexpr int DEFAULT_COUNT_PAGE_SIZE = 10000;
|
||||
protected:
|
||||
static thread_local const ::shared_ptr<parameters> _default_parameters;
|
||||
schema_ptr _schema;
|
||||
uint32_t _bound_terms;
|
||||
@@ -229,6 +229,14 @@ private:
|
||||
lw_shared_ptr<query::read_command>
|
||||
prepare_command_for_base_query(const query_options& options, service::query_state& state, gc_clock::time_point now, bool use_paging);
|
||||
|
||||
future<foreign_ptr<lw_shared_ptr<query::result>>, lw_shared_ptr<query::read_command>>
|
||||
do_execute_base_query(
|
||||
service::storage_proxy& proxy,
|
||||
dht::partition_range_vector&& partition_ranges,
|
||||
service::query_state& state,
|
||||
const query_options& options,
|
||||
gc_clock::time_point now,
|
||||
::shared_ptr<const service::pager::paging_state> paging_state);
|
||||
future<shared_ptr<cql_transport::messages::result_message>>
|
||||
execute_base_query(
|
||||
service::storage_proxy& proxy,
|
||||
@@ -238,6 +246,23 @@ private:
|
||||
gc_clock::time_point now,
|
||||
::shared_ptr<const service::pager::paging_state> paging_state);
|
||||
|
||||
// Function for fetching the selected columns from a list of clustering rows.
|
||||
// It is currently used only in our Secondary Index implementation - ordinary
|
||||
// CQL SELECT statements do not have the syntax to request a list of rows.
|
||||
// FIXME: The current implementation is very inefficient - it requests each
|
||||
// row separately (and, incrementally, in parallel). Even multiple rows from a single
|
||||
// partition are requested separately. This last case can be easily improved,
|
||||
// but to implement the general case (multiple rows from multiple partitions)
|
||||
// efficiently, we will need more support from other layers.
|
||||
// Keys are ordered in token order (see #3423)
|
||||
future<foreign_ptr<lw_shared_ptr<query::result>>, lw_shared_ptr<query::read_command>>
|
||||
do_execute_base_query(
|
||||
service::storage_proxy& proxy,
|
||||
std::vector<primary_key>&& primary_keys,
|
||||
service::query_state& state,
|
||||
const query_options& options,
|
||||
gc_clock::time_point now,
|
||||
::shared_ptr<const service::pager::paging_state> paging_state);
|
||||
future<shared_ptr<cql_transport::messages::result_message>>
|
||||
execute_base_query(
|
||||
service::storage_proxy& proxy,
|
||||
|
||||
@@ -32,7 +32,7 @@ tuples::component_spec_of(shared_ptr<column_specification> column, size_t compon
|
||||
column->ks_name,
|
||||
column->cf_name,
|
||||
::make_shared<column_identifier>(format("{}[{:d}]", column->name, component), true),
|
||||
static_pointer_cast<const tuple_type_impl>(column->type)->type(component));
|
||||
static_pointer_cast<const tuple_type_impl>(column->type->underlying_type())->type(component));
|
||||
}
|
||||
|
||||
shared_ptr<term>
|
||||
|
||||
@@ -70,7 +70,7 @@ public:
|
||||
|
||||
private:
|
||||
void validate_assignable_to(database& db, const sstring& keyspace, shared_ptr<column_specification> receiver) {
|
||||
auto tt = dynamic_pointer_cast<const tuple_type_impl>(receiver->type);
|
||||
auto tt = dynamic_pointer_cast<const tuple_type_impl>(receiver->type->underlying_type());
|
||||
if (!tt) {
|
||||
throw exceptions::invalid_request_exception(format("Invalid tuple type literal for {} of type {}", receiver->name, receiver->type->as_cql3_type()));
|
||||
}
|
||||
|
||||
@@ -260,6 +260,10 @@ void backlog_controller::adjust() {
|
||||
|
||||
float backlog_controller::backlog_of_shares(float shares) const {
|
||||
size_t idx = 1;
|
||||
// No control points means the controller is disabled.
|
||||
if (_control_points.size() == 0) {
|
||||
return 1.0f;
|
||||
}
|
||||
while ((idx < _control_points.size() - 1) && (_control_points[idx].output < shares)) {
|
||||
idx++;
|
||||
}
|
||||
@@ -1963,7 +1967,8 @@ flat_mutation_reader make_multishard_streaming_reader(distributed<database>& db,
|
||||
return make_multishard_combining_reader(make_shared<streaming_reader_lifecycle_policy>(db), partitioner, std::move(s), pr, ps, pc,
|
||||
std::move(trace_state), fwd_mr);
|
||||
});
|
||||
return make_flat_multi_range_reader(std::move(schema), std::move(ms), std::move(range_generator), schema->full_slice(),
|
||||
auto&& full_slice = schema->full_slice();
|
||||
return make_flat_multi_range_reader(std::move(schema), std::move(ms), std::move(range_generator), std::move(full_slice),
|
||||
service::get_local_streaming_read_priority(), {}, mutation_reader::forwarding::no);
|
||||
}
|
||||
|
||||
|
||||
@@ -1236,6 +1236,34 @@ void db::commitlog::segment_manager::flush_segments(bool force) {
|
||||
}
|
||||
}
|
||||
|
||||
/// \brief Helper for ensuring a file is closed if an exception is thrown.
|
||||
///
|
||||
/// The file provided by the file_fut future is passed to func.
|
||||
/// * If func throws an exception E, the file is closed and we return
|
||||
/// a failed future with E.
|
||||
/// * If func returns a value V, the file is not closed and we return
|
||||
/// a future with V.
|
||||
/// Note that when an exception is not thrown, it is the
|
||||
/// responsibility of func to make sure the file will be closed. It
|
||||
/// can close the file itself, return it, or store it somewhere.
|
||||
///
|
||||
/// \tparam Func The type of function this wraps
|
||||
/// \param file_fut A future that produces a file
|
||||
/// \param func A function that uses a file
|
||||
/// \return A future that passes the file produced by file_fut to func
|
||||
/// and closes it if func fails
|
||||
template <typename Func>
|
||||
static auto close_on_failure(future<file> file_fut, Func func) {
|
||||
return file_fut.then([func = std::move(func)](file f) {
|
||||
return futurize_apply(func, f).handle_exception([f] (std::exception_ptr e) mutable {
|
||||
return f.close().then_wrapped([f, e = std::move(e)] (future<> x) {
|
||||
using futurator = futurize<std::result_of_t<Func(file)>>;
|
||||
return futurator::make_exception_future(e);
|
||||
});
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
future<db::commitlog::segment_manager::sseg_ptr> db::commitlog::segment_manager::allocate_segment_ex(const descriptor& d, sstring filename, open_flags flags, bool active) {
|
||||
file_open_options opt;
|
||||
opt.extent_allocation_size_hint = max_size;
|
||||
@@ -1262,7 +1290,7 @@ future<db::commitlog::segment_manager::sseg_ptr> db::commitlog::segment_manager:
|
||||
return fut;
|
||||
});
|
||||
|
||||
return fut.then([this, d, active, filename](file f) {
|
||||
return close_on_failure(std::move(fut), [this, d, active, filename] (file f) {
|
||||
f = make_checked_file(commit_error_handler, f);
|
||||
// xfs doesn't like files extended betond eof, so enlarge the file
|
||||
return f.truncate(max_size).then([this, d, active, f, filename] () mutable {
|
||||
|
||||
@@ -735,6 +735,7 @@ public:
|
||||
val(shutdown_announce_in_ms, uint32_t, 2 * 1000, Used, "Time a node waits after sending gossip shutdown message in milliseconds. Same as -Dcassandra.shutdown_announce_in_ms in cassandra.") \
|
||||
val(developer_mode, bool, false, Used, "Relax environment checks. Setting to true can reduce performance and reliability significantly.") \
|
||||
val(skip_wait_for_gossip_to_settle, int32_t, -1, Used, "An integer to configure the wait for gossip to settle. -1: wait normally, 0: do not wait at all, n: wait for at most n polls. Same as -Dcassandra.skip_wait_for_gossip_to_settle in cassandra.") \
|
||||
val(force_gossip_generation, int32_t, -1, Used, "Force gossip to use the generation number provided by user") \
|
||||
val(experimental, bool, false, Used, "Set to true to unlock experimental features.") \
|
||||
val(lsa_reclamation_step, size_t, 1, Used, "Minimum number of segments to reclaim in a single step") \
|
||||
val(prometheus_port, uint16_t, 9180, Used, "Prometheus port, set to zero to disable") \
|
||||
|
||||
@@ -405,11 +405,8 @@ future<> manager::end_point_hints_manager::sender::do_send_one_mutation(frozen_m
|
||||
return _proxy.send_to_endpoint(std::move(m), end_point_key(), { }, write_type::SIMPLE, service::allow_hints::no);
|
||||
} else {
|
||||
manager_logger.trace("Endpoints set has changed and {} is no longer a replica. Mutating from scratch...", end_point_key());
|
||||
// FIXME: using 1h as infinite timeout. If a node is down, we should get an
|
||||
// unavailable exception.
|
||||
auto timeout = db::timeout_clock::now() + 1h;
|
||||
//FIXME: Add required frozen_mutation overloads
|
||||
return _proxy.mutate({m.fm.unfreeze(m.s)}, consistency_level::ALL, timeout, nullptr);
|
||||
return _proxy.mutate_hint_from_scratch(std::move(m));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@@ -1092,10 +1092,31 @@ static std::vector<V> get_list(const query::result_set_row& row, const sstring&
|
||||
// Create types for a given keyspace. This takes care of topologically sorting user defined types.
|
||||
template <typename T> static std::vector<user_type> create_types(keyspace_metadata& ks, T&& range) {
|
||||
cql_type_parser::raw_builder builder(ks);
|
||||
std::unordered_set<bytes> names;
|
||||
for (const query::result_set_row& row : range) {
|
||||
builder.add(row.get_nonnull<sstring>("type_name"),
|
||||
get_list<sstring>(row, "field_names"),
|
||||
get_list<sstring>(row, "field_types"));
|
||||
auto name = row.get_nonnull<sstring>("type_name");
|
||||
names.insert(to_bytes(name));
|
||||
builder.add(std::move(name), get_list<sstring>(row, "field_names"), get_list<sstring>(row, "field_types"));
|
||||
}
|
||||
// Add user types that use any of the above types. From the
|
||||
// database point of view they haven't changed since the content
|
||||
// of system.types is the same for them. The runtime objects in
|
||||
// the other hand now point to out of date types, so we need to
|
||||
// recreate them.
|
||||
for (const auto& p : ks.user_types()->get_all_types()) {
|
||||
const user_type& t = p.second;
|
||||
if (names.count(t->_name) != 0) {
|
||||
continue;
|
||||
}
|
||||
for (const auto& name : names) {
|
||||
if (t->references_user_type(t->_keyspace, name)) {
|
||||
std::vector<sstring> field_types;
|
||||
for (const data_type& f : t->field_types()) {
|
||||
field_types.push_back(f->as_cql3_type().to_string());
|
||||
}
|
||||
builder.add(t->get_name_as_string(), t->string_field_names(), std::move(field_types));
|
||||
}
|
||||
}
|
||||
}
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
@@ -44,6 +44,11 @@ namespace db::view {
|
||||
// columns. When reading the results from the scylla_views_builds_in_progress
|
||||
// table, we adjust the clustering key (we shed the cpu_id column) and map
|
||||
// back the regular columns.
|
||||
// Since mutation fragment consumers expect clustering_row fragments
|
||||
// not to be duplicated for given primary key, previous clustering key
|
||||
// is stored between mutation fragments. If the clustering key becomes
|
||||
// the same as the previous one (as a result of trimming cpu_id),
|
||||
// the duplicated fragment is ignored.
|
||||
class build_progress_virtual_reader {
|
||||
database& _db;
|
||||
|
||||
@@ -55,6 +60,7 @@ class build_progress_virtual_reader {
|
||||
const query::partition_slice& _legacy_slice;
|
||||
query::partition_slice _slice;
|
||||
flat_mutation_reader _underlying;
|
||||
std::optional<clustering_key> _previous_clustering_key;
|
||||
|
||||
build_progress_reader(
|
||||
schema_ptr legacy_schema,
|
||||
@@ -79,7 +85,8 @@ class build_progress_virtual_reader {
|
||||
pc,
|
||||
std::move(trace_state),
|
||||
fwd,
|
||||
fwd_mr)) {
|
||||
fwd_mr))
|
||||
, _previous_clustering_key() {
|
||||
}
|
||||
|
||||
const schema& underlying_schema() const {
|
||||
@@ -127,8 +134,13 @@ class build_progress_virtual_reader {
|
||||
legacy_in_progress_row.append_cell(_legacy_generation_number_col, std::move(c));
|
||||
}
|
||||
});
|
||||
auto ck = adjust_ckey(scylla_in_progress_row.key());
|
||||
if (_previous_clustering_key && ck.equal(*_schema, *_previous_clustering_key)) {
|
||||
continue;
|
||||
}
|
||||
_previous_clustering_key = ck;
|
||||
mf = clustering_row(
|
||||
adjust_ckey(scylla_in_progress_row.key()),
|
||||
std::move(ck),
|
||||
std::move(scylla_in_progress_row.tomb()),
|
||||
std::move(scylla_in_progress_row.marker()),
|
||||
std::move(legacy_in_progress_row));
|
||||
@@ -140,6 +152,8 @@ class build_progress_virtual_reader {
|
||||
adjust_ckey(scylla_in_progress_rt.end),
|
||||
scylla_in_progress_rt.end_kind,
|
||||
scylla_in_progress_rt.tomb);
|
||||
} else if (mf.is_end_of_partition()) {
|
||||
_previous_clustering_key.reset();
|
||||
}
|
||||
push_mutation_fragment(std::move(mf));
|
||||
}
|
||||
@@ -192,4 +206,4 @@ public:
|
||||
}
|
||||
};
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
@@ -83,7 +83,7 @@ view_info::view_info(const schema& schema, const raw_view_info& raw_view_info)
|
||||
cql3::statements::select_statement& view_info::select_statement() const {
|
||||
if (!_select_statement) {
|
||||
shared_ptr<cql3::statements::raw::select_statement> raw;
|
||||
if (is_index()) {
|
||||
if (service::get_local_storage_service().db().local().find_column_family(base_id()).get_index_manager().is_global_index(_schema)) {
|
||||
// Token column is the first clustering column
|
||||
auto token_column_it = boost::range::find_if(_schema.all_columns(), std::mem_fn(&column_definition::is_clustering_key));
|
||||
auto real_columns = _schema.all_columns() | boost::adaptors::filtered([this, token_column_it](const column_definition& cdef) {
|
||||
@@ -449,7 +449,7 @@ void create_virtual_column(schema_builder& builder, const bytes& name, const dat
|
||||
// A map has keys and values. We don't need these values,
|
||||
// and can use empty values instead.
|
||||
auto mtype = dynamic_pointer_cast<const map_type_impl>(type);
|
||||
builder.with_column(name, map_type_impl::get_instance(mtype->get_values_type(), empty_type, true), column_kind::regular_column, column_view_virtual::yes);
|
||||
builder.with_column(name, map_type_impl::get_instance(mtype->get_keys_type(), empty_type, true), column_kind::regular_column, column_view_virtual::yes);
|
||||
} else if (ctype->is_set()) {
|
||||
// A set's cell has nothing beyond the keys, so the
|
||||
// virtual version of a set is, unfortunately, a complete
|
||||
|
||||
2
dist/common/scripts/scylla_util.py
vendored
2
dist/common/scripts/scylla_util.py
vendored
@@ -476,6 +476,8 @@ def create_perftune_conf(nic='eth0'):
|
||||
|
||||
|
||||
def is_valid_nic(nic):
|
||||
if len(nic) == 0:
|
||||
return False
|
||||
return os.path.exists('/sys/class/net/{}'.format(nic))
|
||||
|
||||
# Remove this when we do not support SET_NIC configuration value anymore
|
||||
|
||||
2
dist/debian/build_deb.sh
vendored
2
dist/debian/build_deb.sh
vendored
@@ -125,7 +125,7 @@ if [ -z "$TARGET" ]; then
|
||||
fi
|
||||
RELOC_PKG_FULLPATH=$(readlink -f $RELOC_PKG)
|
||||
RELOC_PKG_BASENAME=$(basename $RELOC_PKG)
|
||||
SCYLLA_VERSION=$(cat SCYLLA-VERSION-FILE)
|
||||
SCYLLA_VERSION=$(cat SCYLLA-VERSION-FILE | sed 's/\.rc/~rc/')
|
||||
SCYLLA_RELEASE=$(cat SCYLLA-RELEASE-FILE)
|
||||
|
||||
ln -fv $RELOC_PKG_FULLPATH ../$PRODUCT-server_$SCYLLA_VERSION-$SCYLLA_RELEASE.orig.tar.gz
|
||||
|
||||
6
dist/debian/debian/scylla-server.postrm
vendored
6
dist/debian/debian/scylla-server.postrm
vendored
@@ -4,7 +4,11 @@ set -e
|
||||
|
||||
case "$1" in
|
||||
purge|remove)
|
||||
rm -rf /etc/systemd/system/scylla-server.service.d/
|
||||
# We need to keep dependencies.conf and sysconfdir.conf on 'remove',
|
||||
# otherwise it will be missing after rollback.
|
||||
if [ "$1" = "purge" ]; then
|
||||
rm -rf /etc/systemd/system/scylla-server.service.d/
|
||||
fi
|
||||
;;
|
||||
esac
|
||||
|
||||
|
||||
1
dist/debian/scylla-server.install.mustache
vendored
1
dist/debian/scylla-server.install.mustache
vendored
@@ -15,3 +15,4 @@ dist/common/systemd/scylla-housekeeping-restart.timer /lib/systemd/system
|
||||
dist/common/systemd/scylla-fstrim.timer /lib/systemd/system
|
||||
dist/debian/scripts/scylla_save_coredump usr/lib/scylla
|
||||
dist/debian/scripts/scylla_delay_fstrim usr/lib/scylla
|
||||
tools/scyllatop usr/lib/scylla
|
||||
|
||||
4
dist/redhat/scylla.spec.mustache
vendored
4
dist/redhat/scylla.spec.mustache
vendored
@@ -15,6 +15,10 @@ Obsoletes: scylla-server < 1.1
|
||||
%global __brp_python_bytecompile %{nil}
|
||||
%global __brp_mangle_shebangs %{nil}
|
||||
|
||||
# Prevent find-debuginfo.sh from tempering with scylla's build-id (#5881)
|
||||
%undefine _unique_build_ids
|
||||
%global _no_recompute_build_ids 1
|
||||
|
||||
%description
|
||||
Scylla is a highly scalable, eventually consistent, distributed,
|
||||
partitioned row DB.
|
||||
|
||||
@@ -98,6 +98,13 @@ public:
|
||||
sstring get_message() const { return what(); }
|
||||
};
|
||||
|
||||
class server_exception : public cassandra_exception {
|
||||
public:
|
||||
server_exception(sstring msg) noexcept
|
||||
: exceptions::cassandra_exception{exceptions::exception_code::SERVER_ERROR, std::move(msg)}
|
||||
{ }
|
||||
};
|
||||
|
||||
class protocol_exception : public cassandra_exception {
|
||||
public:
|
||||
protocol_exception(sstring msg) noexcept
|
||||
|
||||
@@ -481,8 +481,7 @@ future<> gossiper::apply_state_locally(std::map<inet_address, endpoint_state> ma
|
||||
int local_generation = local_ep_state_ptr.get_heart_beat_state().get_generation();
|
||||
int remote_generation = remote_state.get_heart_beat_state().get_generation();
|
||||
logger.trace("{} local generation {}, remote generation {}", ep, local_generation, remote_generation);
|
||||
// A node was removed with nodetool removenode can have a generation of 2
|
||||
if (local_generation > 2 && remote_generation > local_generation + MAX_GENERATION_DIFFERENCE) {
|
||||
if (remote_generation > service::get_generation_number() + MAX_GENERATION_DIFFERENCE) {
|
||||
// assume some peer has corrupted memory and is broadcasting an unbelievable generation about another peer (or itself)
|
||||
logger.warn("received an invalid gossip generation for peer {}; local generation = {}, received generation = {}",
|
||||
ep, local_generation, remote_generation);
|
||||
@@ -1613,11 +1612,15 @@ future<> gossiper::start_gossiping(int generation_nbr, std::map<application_stat
|
||||
// message on all cpus and forard them to cpu0 to process.
|
||||
return get_gossiper().invoke_on_all([do_bind] (gossiper& g) {
|
||||
g.init_messaging_service_handler(do_bind);
|
||||
}).then([this, generation_nbr, preload_local_states] {
|
||||
}).then([this, generation_nbr, preload_local_states] () mutable {
|
||||
build_seeds_list();
|
||||
/* initialize the heartbeat state for this localEndpoint */
|
||||
maybe_initialize_local_state(generation_nbr);
|
||||
if (_cfg.force_gossip_generation() > 0) {
|
||||
generation_nbr = _cfg.force_gossip_generation();
|
||||
logger.warn("Use the generation number provided by user: generation = {}", generation_nbr);
|
||||
}
|
||||
endpoint_state& local_state = endpoint_state_map[get_broadcast_address()];
|
||||
local_state.set_heart_beat_state_and_update_timestamp(heart_beat_state(generation_nbr));
|
||||
local_state.mark_alive();
|
||||
for (auto& entry : preload_local_states) {
|
||||
local_state.add_application_state(entry.first, entry.second);
|
||||
}
|
||||
@@ -1821,7 +1824,8 @@ future<> gossiper::do_stop_gossiping() {
|
||||
if (my_ep_state && !is_silent_shutdown_state(*my_ep_state)) {
|
||||
logger.info("Announcing shutdown");
|
||||
add_local_application_state(application_state::STATUS, _value_factory.shutdown(true)).get();
|
||||
for (inet_address addr : _live_endpoints) {
|
||||
auto live_endpoints = _live_endpoints;
|
||||
for (inet_address addr : live_endpoints) {
|
||||
msg_addr id = get_msg_addr(addr);
|
||||
logger.trace("Sending a GossipShutdown to {}", id);
|
||||
ms().send_gossip_shutdown(id, get_broadcast_address()).then_wrapped([id] (auto&&f) {
|
||||
|
||||
@@ -160,7 +160,9 @@ public:
|
||||
static constexpr std::chrono::milliseconds INTERVAL{1000};
|
||||
static constexpr std::chrono::hours A_VERY_LONG_TIME{24 * 3};
|
||||
|
||||
/** Maximimum difference in generation and version values we are willing to accept about a peer */
|
||||
// Maximimum difference between remote generation value and generation
|
||||
// value this node would get if this node were restarted that we are
|
||||
// willing to accept about a peer.
|
||||
static constexpr int64_t MAX_GENERATION_DIFFERENCE = 86400 * 365;
|
||||
std::chrono::milliseconds fat_client_timeout;
|
||||
|
||||
|
||||
@@ -26,6 +26,6 @@ class partition {
|
||||
|
||||
class reconcilable_result {
|
||||
uint32_t row_count();
|
||||
std::vector<partition> partitions();
|
||||
utils::chunked_vector<partition> partitions();
|
||||
query::short_read is_short_read() [[version 1.6]] = query::short_read::no;
|
||||
};
|
||||
|
||||
@@ -181,4 +181,10 @@ bool secondary_index_manager::is_index(const schema& s) const {
|
||||
});
|
||||
}
|
||||
|
||||
bool secondary_index_manager::is_global_index(const schema& s) const {
|
||||
return boost::algorithm::any_of(_indices | boost::adaptors::map_values, [&s] (const index& i) {
|
||||
return !i.metadata().local() && s.cf_name() == index_table_name(i.metadata().name());
|
||||
});
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -77,6 +77,7 @@ public:
|
||||
std::vector<index> list_indexes() const;
|
||||
bool is_index(view_ptr) const;
|
||||
bool is_index(const schema& s) const;
|
||||
bool is_global_index(const schema& s) const;
|
||||
private:
|
||||
void add_index(const index_metadata& im);
|
||||
};
|
||||
|
||||
@@ -53,13 +53,13 @@ std::vector<inet_address> simple_strategy::calculate_natural_endpoints(const tok
|
||||
endpoints.reserve(replicas);
|
||||
|
||||
for (auto& token : tm.ring_range(t)) {
|
||||
if (endpoints.size() == replicas) {
|
||||
break;
|
||||
}
|
||||
auto ep = tm.get_endpoint(token);
|
||||
assert(ep);
|
||||
|
||||
endpoints.push_back(*ep);
|
||||
if (endpoints.size() == replicas) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return std::move(endpoints.get_vector());
|
||||
|
||||
10
main.cc
10
main.cc
@@ -54,6 +54,7 @@
|
||||
#include <seastar/core/file.hh>
|
||||
#include <sys/time.h>
|
||||
#include <sys/resource.h>
|
||||
#include <sys/prctl.h>
|
||||
#include "disk-error-handler.hh"
|
||||
#include "tracing/tracing.hh"
|
||||
#include "tracing/tracing_backend_registry.hh"
|
||||
@@ -323,6 +324,15 @@ static std::optional<std::vector<sstring>> parse_hinted_handoff_enabled(sstring
|
||||
}
|
||||
|
||||
int main(int ac, char** av) {
|
||||
// Allow core dumps. The would be disabled by default if
|
||||
// CAP_SYS_NICE was added to the binary, as is suggested by the
|
||||
// epoll backend.
|
||||
int r = prctl(PR_SET_DUMPABLE, 1, 0, 0, 0);
|
||||
if (r) {
|
||||
std::cerr << "Could not make scylla dumpable\n";
|
||||
exit(1);
|
||||
}
|
||||
|
||||
int return_value = 0;
|
||||
try {
|
||||
// early check to avoid triggering
|
||||
|
||||
@@ -39,6 +39,9 @@
|
||||
#include "mutation_cleaner.hh"
|
||||
#include <seastar/core/execution_stage.hh>
|
||||
#include "types/map.hh"
|
||||
#include "utils/exceptions.hh"
|
||||
|
||||
logging::logger mplog("mutation_partition");
|
||||
|
||||
template<bool reversed>
|
||||
struct reversal_traits;
|
||||
@@ -1227,7 +1230,9 @@ row::apply_monotonically(const column_definition& column, atomic_cell_or_collect
|
||||
void
|
||||
row::append_cell(column_id id, atomic_cell_or_collection value) {
|
||||
if (_type == storage_type::vector && id < max_vector_size) {
|
||||
assert(_storage.vector.v.size() <= id);
|
||||
if (_storage.vector.v.size() > id) {
|
||||
on_internal_error(mplog, format("Attempted to append cell#{} to row already having {} cells", id, _storage.vector.v.size()));
|
||||
}
|
||||
_storage.vector.v.resize(id);
|
||||
_storage.vector.v.emplace_back(cell_and_hash{std::move(value), cell_hash_opt()});
|
||||
_storage.vector.present.set(id);
|
||||
|
||||
@@ -397,7 +397,7 @@ public:
|
||||
if (is_missing() || _ttl == dead) {
|
||||
return false;
|
||||
}
|
||||
if (_ttl != no_ttl && _expiry < now) {
|
||||
if (_ttl != no_ttl && _expiry <= now) {
|
||||
return false;
|
||||
}
|
||||
return _timestamp > t.timestamp;
|
||||
@@ -407,7 +407,7 @@ public:
|
||||
if (_ttl == dead) {
|
||||
return true;
|
||||
}
|
||||
return _ttl != no_ttl && _expiry < now;
|
||||
return _ttl != no_ttl && _expiry <= now;
|
||||
}
|
||||
// Can be called only when is_live().
|
||||
bool is_expiring() const {
|
||||
@@ -447,7 +447,7 @@ public:
|
||||
_timestamp = api::missing_timestamp;
|
||||
return false;
|
||||
}
|
||||
if (_ttl > no_ttl && _expiry < now) {
|
||||
if (_ttl > no_ttl && _expiry <= now) {
|
||||
_expiry -= _ttl;
|
||||
_ttl = dead;
|
||||
}
|
||||
|
||||
@@ -31,7 +31,7 @@ reconcilable_result::reconcilable_result()
|
||||
: _row_count(0)
|
||||
{ }
|
||||
|
||||
reconcilable_result::reconcilable_result(uint32_t row_count, std::vector<partition> p, query::short_read short_read,
|
||||
reconcilable_result::reconcilable_result(uint32_t row_count, utils::chunked_vector<partition> p, query::short_read short_read,
|
||||
query::result_memory_tracker memory_tracker)
|
||||
: _row_count(row_count)
|
||||
, _short_read(short_read)
|
||||
@@ -39,11 +39,11 @@ reconcilable_result::reconcilable_result(uint32_t row_count, std::vector<partiti
|
||||
, _partitions(std::move(p))
|
||||
{ }
|
||||
|
||||
const std::vector<partition>& reconcilable_result::partitions() const {
|
||||
const utils::chunked_vector<partition>& reconcilable_result::partitions() const {
|
||||
return _partitions;
|
||||
}
|
||||
|
||||
std::vector<partition>& reconcilable_result::partitions() {
|
||||
utils::chunked_vector<partition>& reconcilable_result::partitions() {
|
||||
return _partitions;
|
||||
}
|
||||
|
||||
|
||||
@@ -27,6 +27,7 @@
|
||||
#include "frozen_mutation.hh"
|
||||
#include "db/timeout_clock.hh"
|
||||
#include "querier.hh"
|
||||
#include "utils/chunked_vector.hh"
|
||||
#include <seastar/core/execution_stage.hh>
|
||||
|
||||
class reconcilable_result;
|
||||
@@ -72,17 +73,17 @@ class reconcilable_result {
|
||||
uint32_t _row_count;
|
||||
query::short_read _short_read;
|
||||
query::result_memory_tracker _memory_tracker;
|
||||
std::vector<partition> _partitions;
|
||||
utils::chunked_vector<partition> _partitions;
|
||||
public:
|
||||
~reconcilable_result();
|
||||
reconcilable_result();
|
||||
reconcilable_result(reconcilable_result&&) = default;
|
||||
reconcilable_result& operator=(reconcilable_result&&) = default;
|
||||
reconcilable_result(uint32_t row_count, std::vector<partition> partitions, query::short_read short_read,
|
||||
reconcilable_result(uint32_t row_count, utils::chunked_vector<partition> partitions, query::short_read short_read,
|
||||
query::result_memory_tracker memory_tracker = { });
|
||||
|
||||
const std::vector<partition>& partitions() const;
|
||||
std::vector<partition>& partitions();
|
||||
const utils::chunked_vector<partition>& partitions() const;
|
||||
utils::chunked_vector<partition>& partitions();
|
||||
|
||||
uint32_t row_count() const {
|
||||
return _row_count;
|
||||
@@ -112,7 +113,7 @@ class reconcilable_result_builder {
|
||||
const schema& _schema;
|
||||
const query::partition_slice& _slice;
|
||||
|
||||
std::vector<partition> _result;
|
||||
utils::chunked_vector<partition> _result;
|
||||
uint32_t _live_rows{};
|
||||
|
||||
bool _has_ck_selector{};
|
||||
|
||||
@@ -8,7 +8,6 @@ print_usage() {
|
||||
echo " --clean clean build directory"
|
||||
echo " --compiler C++ compiler path"
|
||||
echo " --c-compiler C compiler path"
|
||||
echo " --nodeps skip installing dependencies"
|
||||
exit 1
|
||||
}
|
||||
|
||||
@@ -16,7 +15,6 @@ JOBS=
|
||||
CLEAN=
|
||||
COMPILER=
|
||||
CCOMPILER=
|
||||
NODEPS=
|
||||
while [ $# -gt 0 ]; do
|
||||
case "$1" in
|
||||
"--jobs")
|
||||
@@ -36,7 +34,6 @@ while [ $# -gt 0 ]; do
|
||||
shift 2
|
||||
;;
|
||||
"--nodeps")
|
||||
NODEPS=yes
|
||||
shift 1
|
||||
;;
|
||||
*)
|
||||
@@ -66,10 +63,6 @@ if [ -f build/release/scylla-package.tar.gz ]; then
|
||||
rm build/release/scylla-package.tar.gz
|
||||
fi
|
||||
|
||||
if [ -z "$NODEPS" ]; then
|
||||
sudo ./install-dependencies.sh
|
||||
fi
|
||||
|
||||
NINJA=$(which ninja-build) &&:
|
||||
if [ -z "$NINJA" ]; then
|
||||
NINJA=$(which ninja) &&:
|
||||
|
||||
@@ -780,8 +780,10 @@ static future<> repair_cf_range(repair_info& ri,
|
||||
// still do our best to repair available replicas.
|
||||
std::vector<gms::inet_address> live_neighbors;
|
||||
std::vector<partition_checksum> live_neighbors_checksum;
|
||||
bool local_checksum_failed = false;
|
||||
for (unsigned i = 0; i < checksums.size(); i++) {
|
||||
if (checksums[i].failed()) {
|
||||
local_checksum_failed |= (i == 0);
|
||||
rlogger.warn(
|
||||
"Checksum of ks={}, table={}, range={} on {} failed: {}",
|
||||
ri.keyspace, cf, range,
|
||||
@@ -797,7 +799,7 @@ static future<> repair_cf_range(repair_info& ri,
|
||||
live_neighbors_checksum.push_back(checksums[i].get0());
|
||||
}
|
||||
}
|
||||
if (checksums[0].failed() || live_neighbors.empty()) {
|
||||
if (local_checksum_failed || live_neighbors.empty()) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
// If one of the available checksums is different, repair
|
||||
|
||||
@@ -371,6 +371,10 @@ class repair_writer {
|
||||
std::vector<std::optional<seastar::queue<mutation_fragment_opt>>> _mq;
|
||||
// Current partition written to disk
|
||||
std::vector<lw_shared_ptr<const decorated_key_with_hash>> _current_dk_written_to_sstable;
|
||||
// Is current partition still open. A partition is opened when a
|
||||
// partition_start is written and is closed when a partition_end is
|
||||
// written.
|
||||
std::vector<bool> _partition_opened;
|
||||
public:
|
||||
repair_writer(
|
||||
schema_ptr schema,
|
||||
@@ -385,10 +389,13 @@ public:
|
||||
future<> write_start_and_mf(lw_shared_ptr<const decorated_key_with_hash> dk, mutation_fragment mf, unsigned node_idx) {
|
||||
_current_dk_written_to_sstable[node_idx] = dk;
|
||||
if (mf.is_partition_start()) {
|
||||
return _mq[node_idx]->push_eventually(mutation_fragment_opt(std::move(mf)));
|
||||
return _mq[node_idx]->push_eventually(mutation_fragment_opt(std::move(mf))).then([this, node_idx] {
|
||||
_partition_opened[node_idx] = true;
|
||||
});
|
||||
} else {
|
||||
auto start = mutation_fragment(partition_start(dk->dk, tombstone()));
|
||||
return _mq[node_idx]->push_eventually(mutation_fragment_opt(std::move(start))).then([this, node_idx, mf = std::move(mf)] () mutable {
|
||||
_partition_opened[node_idx] = true;
|
||||
return _mq[node_idx]->push_eventually(mutation_fragment_opt(std::move(mf)));
|
||||
});
|
||||
}
|
||||
@@ -398,6 +405,7 @@ public:
|
||||
_writer_done.resize(_nr_peer_nodes);
|
||||
_mq.resize(_nr_peer_nodes);
|
||||
_current_dk_written_to_sstable.resize(_nr_peer_nodes);
|
||||
_partition_opened.resize(_nr_peer_nodes, false);
|
||||
}
|
||||
|
||||
void create_writer(unsigned node_idx) {
|
||||
@@ -434,12 +442,21 @@ public:
|
||||
t.stream_in_progress());
|
||||
}
|
||||
|
||||
future<> write_partition_end(unsigned node_idx) {
|
||||
if (_partition_opened[node_idx]) {
|
||||
return _mq[node_idx]->push_eventually(mutation_fragment(partition_end())).then([this, node_idx] {
|
||||
_partition_opened[node_idx] = false;
|
||||
});
|
||||
}
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
future<> do_write(unsigned node_idx, lw_shared_ptr<const decorated_key_with_hash> dk, mutation_fragment mf) {
|
||||
if (_current_dk_written_to_sstable[node_idx]) {
|
||||
if (_current_dk_written_to_sstable[node_idx]->dk.equal(*_schema, dk->dk)) {
|
||||
return _mq[node_idx]->push_eventually(mutation_fragment_opt(std::move(mf)));
|
||||
} else {
|
||||
return _mq[node_idx]->push_eventually(mutation_fragment(partition_end())).then([this,
|
||||
return write_partition_end(node_idx).then([this,
|
||||
node_idx, dk = std::move(dk), mf = std::move(mf)] () mutable {
|
||||
return write_start_and_mf(std::move(dk), std::move(mf), node_idx);
|
||||
});
|
||||
@@ -453,7 +470,7 @@ public:
|
||||
return parallel_for_each(boost::irange(unsigned(0), unsigned(_nr_peer_nodes)), [this] (unsigned node_idx) {
|
||||
if (_writer_done[node_idx] && _mq[node_idx]) {
|
||||
// Partition_end is never sent on wire, so we have to write one ourselves.
|
||||
return _mq[node_idx]->push_eventually(mutation_fragment(partition_end())).then([this, node_idx] () mutable {
|
||||
return write_partition_end(node_idx).then([this, node_idx] () mutable {
|
||||
// Empty mutation_fragment_opt means no more data, so the writer can seal the sstables.
|
||||
return _mq[node_idx]->push_eventually(mutation_fragment_opt()).then([this, node_idx] () mutable {
|
||||
return (*_writer_done[node_idx]).then([] (uint64_t partitions) {
|
||||
@@ -1458,7 +1475,7 @@ class row_level_repair {
|
||||
|
||||
// If the total size of the `_row_buf` on either of the nodes is zero,
|
||||
// we set this flag, which is an indication that rows are not synced.
|
||||
bool _zero_rows;
|
||||
bool _zero_rows = false;
|
||||
|
||||
// Sum of estimated_partitions on all peers
|
||||
uint64_t _estimated_partitions = 0;
|
||||
|
||||
@@ -925,7 +925,6 @@ future<> row_cache::do_update(external_updater eu, memtable& m, Updater updater)
|
||||
});
|
||||
|
||||
return seastar::async([this, &m, updater = std::move(updater), real_dirty_acc = std::move(real_dirty_acc)] () mutable {
|
||||
coroutine update;
|
||||
size_t size_entry;
|
||||
// In case updater fails, we must bring the cache to consistency without deferring.
|
||||
auto cleanup = defer([&m, this] {
|
||||
@@ -933,6 +932,7 @@ future<> row_cache::do_update(external_updater eu, memtable& m, Updater updater)
|
||||
_prev_snapshot_pos = {};
|
||||
_prev_snapshot = {};
|
||||
});
|
||||
coroutine update; // Destroy before cleanup to release snapshots before invalidating.
|
||||
partition_presence_checker is_present = _prev_snapshot->make_partition_presence_checker();
|
||||
while (!m.partitions.empty()) {
|
||||
with_allocator(_tracker.allocator(), [&] () {
|
||||
|
||||
@@ -78,7 +78,7 @@ for exe in executables:
|
||||
libs.update(ldd(exe))
|
||||
|
||||
# manually add libthread_db for debugging thread
|
||||
libs.update({'libthread_db-1.0.so': '/lib64/libthread_db-1.0.so'})
|
||||
libs.update({'libthread_db.so.1': '/lib64/libthread_db-1.0.so'})
|
||||
|
||||
ld_so = libs['ld.so']
|
||||
|
||||
|
||||
2
seastar
2
seastar
Submodule seastar updated: 75488f6ef2...a51bd8b91a
@@ -162,13 +162,14 @@ future<lowres_clock::duration> cache_hitrate_calculator::recalculate_hitrates()
|
||||
auto& g = gms::get_local_gossiper();
|
||||
auto& ss = get_local_storage_service();
|
||||
_slen = _gstate.size();
|
||||
g.add_local_application_state(gms::application_state::CACHE_HITRATES, ss.value_factory.cache_hitrates(_gstate));
|
||||
// if max difference during this round is big schedule next recalculate earlier
|
||||
if (_diff < 0.01) {
|
||||
return std::chrono::milliseconds(2000);
|
||||
} else {
|
||||
return std::chrono::milliseconds(500);
|
||||
}
|
||||
return g.add_local_application_state(gms::application_state::CACHE_HITRATES, ss.value_factory.cache_hitrates(_gstate)).then([this] {
|
||||
// if max difference during this round is big schedule next recalculate earlier
|
||||
if (_diff < 0.01) {
|
||||
return std::chrono::milliseconds(2000);
|
||||
} else {
|
||||
return std::chrono::milliseconds(500);
|
||||
}
|
||||
});
|
||||
}).finally([this] {
|
||||
_gstate = std::string(); // free memory, do not trust clear() to do that for string
|
||||
_rates.clear();
|
||||
|
||||
@@ -350,7 +350,9 @@ public:
|
||||
}
|
||||
void on_released() {
|
||||
_expire_timer.cancel();
|
||||
_mutation_holder->release_mutation();
|
||||
if (_targets.size() == 0) {
|
||||
_mutation_holder->release_mutation();
|
||||
}
|
||||
}
|
||||
void timeout_cb() {
|
||||
if (_cl_achieved || _cl == db::consistency_level::ANY) {
|
||||
@@ -1558,6 +1560,14 @@ future<> storage_proxy::send_to_endpoint(
|
||||
allow_hints);
|
||||
}
|
||||
|
||||
future<> storage_proxy::mutate_hint_from_scratch(frozen_mutation_and_schema fm_a_s) {
|
||||
// FIXME: using 1h as infinite timeout. If a node is down, we should get an
|
||||
// unavailable exception.
|
||||
const auto timeout = db::timeout_clock::now() + 1h;
|
||||
std::array<mutation, 1> ms{fm_a_s.fm.unfreeze(fm_a_s.s)};
|
||||
return mutate_internal(std::move(ms), db::consistency_level::ALL, false, nullptr, timeout);
|
||||
}
|
||||
|
||||
/**
|
||||
* Send the mutations to the right targets, write it locally if it corresponds or writes a hint when the node
|
||||
* is not available.
|
||||
@@ -2305,8 +2315,8 @@ public:
|
||||
|
||||
// build reconcilable_result from reconciled data
|
||||
// traverse backwards since large keys are at the start
|
||||
std::vector<partition> vec;
|
||||
auto r = boost::accumulate(reconciled_partitions | boost::adaptors::reversed, std::ref(vec), [] (std::vector<partition>& a, const mutation_and_live_row_count& m_a_rc) {
|
||||
utils::chunked_vector<partition> vec;
|
||||
auto r = boost::accumulate(reconciled_partitions | boost::adaptors::reversed, std::ref(vec), [] (utils::chunked_vector<partition>& a, const mutation_and_live_row_count& m_a_rc) {
|
||||
a.emplace_back(partition(m_a_rc.live_row_count, freeze(m_a_rc.mut)));
|
||||
return std::ref(a);
|
||||
});
|
||||
|
||||
@@ -387,6 +387,8 @@ public:
|
||||
*/
|
||||
future<> mutate_atomically(std::vector<mutation> mutations, db::consistency_level cl, clock_type::time_point timeout, tracing::trace_state_ptr tr_state);
|
||||
|
||||
future<> mutate_hint_from_scratch(frozen_mutation_and_schema fm_a_s);
|
||||
|
||||
// Send a mutation to one specific remote target.
|
||||
// Inspired by Cassandra's StorageProxy.sendToHintedEndpoints but without
|
||||
// hinted handoff support, and just one target. See also
|
||||
|
||||
@@ -1440,7 +1440,8 @@ future<> storage_service::drain_on_shutdown() {
|
||||
ss._sys_dist_ks.invoke_on_all(&db::system_distributed_keyspace::stop).get();
|
||||
slogger.info("Drain on shutdown: system distributed keyspace stopped");
|
||||
|
||||
get_storage_proxy().invoke_on_all([&ss] (storage_proxy& local_proxy) mutable {
|
||||
get_storage_proxy().invoke_on_all([] (storage_proxy& local_proxy) mutable {
|
||||
auto& ss = service::get_local_storage_service();
|
||||
ss.unregister_subscriber(&local_proxy);
|
||||
return local_proxy.drain_on_shutdown();
|
||||
}).get();
|
||||
|
||||
@@ -3285,7 +3285,7 @@ delete_atomically(std::vector<shared_sstable> ssts) {
|
||||
dir_f.close().get();
|
||||
sstlog.debug("{} written successfully.", pending_delete_log);
|
||||
} catch (...) {
|
||||
sstlog.warn("Error while writing {}: {}. Ignoring.", pending_delete_log), std::current_exception();
|
||||
sstlog.warn("Error while writing {}: {}. Ignoring.", pending_delete_log, std::current_exception());
|
||||
}
|
||||
|
||||
parallel_for_each(ssts, [] (shared_sstable sst) {
|
||||
|
||||
2
table.cc
2
table.cc
@@ -2518,7 +2518,7 @@ future<row_locker::lock_holder> table::do_push_view_replica_updates(const schema
|
||||
std::move(slice),
|
||||
std::move(m),
|
||||
[base, views = std::move(views), lock = std::move(lock), this, timeout, source = std::move(source), &io_priority] (auto& pk, auto& slice, auto& m) mutable {
|
||||
auto reader = source.make_reader(base, pk, slice, io_priority);
|
||||
auto reader = source.make_reader(base, pk, slice, io_priority, nullptr, streamed_mutation::forwarding::no, mutation_reader::forwarding::no);
|
||||
return this->generate_and_propagate_view_updates(base, std::move(views), std::move(m), std::move(reader)).then([lock = std::move(lock)] () mutable {
|
||||
// return the local partition/row lock we have taken so it
|
||||
// remains locked until the caller is done modifying this
|
||||
|
||||
@@ -77,3 +77,45 @@ BOOST_AUTO_TEST_CASE(test_make_random_uuid) {
|
||||
std::sort(uuids.begin(), uuids.end());
|
||||
BOOST_CHECK(std::unique(uuids.begin(), uuids.end()) == uuids.end());
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_CASE(test_get_time_uuid) {
|
||||
using namespace std::chrono;
|
||||
|
||||
auto uuid = utils::UUID_gen::get_time_UUID();
|
||||
BOOST_CHECK(uuid.is_timestamp());
|
||||
|
||||
auto tp = system_clock::now();
|
||||
uuid = utils::UUID_gen::get_time_UUID(tp);
|
||||
BOOST_CHECK(uuid.is_timestamp());
|
||||
|
||||
auto millis = duration_cast<milliseconds>(tp.time_since_epoch()).count();
|
||||
uuid = utils::UUID_gen::get_time_UUID(millis);
|
||||
BOOST_CHECK(uuid.is_timestamp());
|
||||
|
||||
auto unix_timestamp = utils::UUID_gen::unix_timestamp(uuid);
|
||||
BOOST_CHECK(unix_timestamp == millis);
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_CASE(test_min_time_uuid) {
|
||||
using namespace std::chrono;
|
||||
|
||||
auto tp = system_clock::now();
|
||||
auto millis = duration_cast<milliseconds>(tp.time_since_epoch()).count();
|
||||
auto uuid = utils::UUID_gen::min_time_UUID(millis);
|
||||
BOOST_CHECK(uuid.is_timestamp());
|
||||
|
||||
auto unix_timestamp = utils::UUID_gen::unix_timestamp(uuid);
|
||||
BOOST_CHECK(unix_timestamp == millis);
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_CASE(test_max_time_uuid) {
|
||||
using namespace std::chrono;
|
||||
|
||||
auto tp = system_clock::now();
|
||||
auto millis = duration_cast<milliseconds>(tp.time_since_epoch()).count();
|
||||
auto uuid = utils::UUID_gen::max_time_UUID(millis);
|
||||
BOOST_CHECK(uuid.is_timestamp());
|
||||
|
||||
auto unix_timestamp = utils::UUID_gen::unix_timestamp(uuid);
|
||||
BOOST_CHECK(unix_timestamp == millis);
|
||||
}
|
||||
|
||||
@@ -180,9 +180,13 @@ rows_assertions rows_assertions::with_serialized_columns_count(size_t columns_co
|
||||
}
|
||||
|
||||
shared_ptr<cql_transport::messages::result_message> cquery_nofail(
|
||||
cql_test_env& env, const char* query, const std::experimental::source_location& loc) {
|
||||
cql_test_env& env, const char* query, std::unique_ptr<cql3::query_options>&& qo, const std::experimental::source_location& loc) {
|
||||
try {
|
||||
return env.execute_cql(query).get0();
|
||||
if (qo) {
|
||||
return env.execute_cql(query, std::move(qo)).get0();
|
||||
} else {
|
||||
return env.execute_cql(query).get0();
|
||||
}
|
||||
} catch (...) {
|
||||
BOOST_FAIL(format("query '{}' failed: {}\n{}:{}: originally from here",
|
||||
query, std::current_exception(), loc.file_name(), loc.line()));
|
||||
|
||||
@@ -86,4 +86,5 @@ void assert_that_failed(future<T...>&& f)
|
||||
shared_ptr<cql_transport::messages::result_message> cquery_nofail(
|
||||
cql_test_env& env,
|
||||
const char* query,
|
||||
std::unique_ptr<cql3::query_options>&& qo = nullptr,
|
||||
const std::experimental::source_location& loc = std::experimental::source_location::current());
|
||||
|
||||
@@ -1526,6 +1526,18 @@ SEASTAR_TEST_CASE(test_user_type_nested) {
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_user_type_reversed) {
|
||||
return do_with_cql_env_thread([](cql_test_env& e) {
|
||||
e.execute_cql("create type my_type (a int);").get();
|
||||
e.execute_cql("create table tbl (a int, b frozen<my_type>, primary key ((a), b)) with clustering order by (b desc);").get();
|
||||
e.execute_cql("insert into tbl (a, b) values (1, (2));").get();
|
||||
assert_that(e.execute_cql("select a,b.a from tbl;").get0())
|
||||
.is_rows()
|
||||
.with_size(1)
|
||||
.with_row({int32_type->decompose(1), int32_type->decompose(2)});
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_user_type) {
|
||||
return do_with_cql_env([] (cql_test_env& e) {
|
||||
return e.execute_cql("create type ut1 (my_int int, my_bigint bigint, my_text text);").discard_result().then([&e] {
|
||||
@@ -3577,3 +3589,239 @@ SEASTAR_TEST_CASE(test_describe_varchar) {
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_alter_type_on_compact_storage_with_no_regular_columns_does_not_crash) {
|
||||
return do_with_cql_env_thread([] (cql_test_env& e) {
|
||||
cquery_nofail(e, "CREATE TYPE my_udf (first text);");
|
||||
cquery_nofail(e, "create table z (pk int, ck frozen<my_udf>, primary key(pk, ck)) with compact storage;");
|
||||
cquery_nofail(e, "alter type my_udf add test_int int;");
|
||||
});
|
||||
}
|
||||
|
||||
shared_ptr<cql_transport::messages::result_message> cql_func_require_nofail(
|
||||
cql_test_env& env,
|
||||
const seastar::sstring& fct,
|
||||
const seastar::sstring& inp,
|
||||
std::unique_ptr<cql3::query_options>&& qo = nullptr,
|
||||
const std::experimental::source_location& loc = std::experimental::source_location::current()) {
|
||||
auto res = shared_ptr<cql_transport::messages::result_message>(nullptr);
|
||||
auto query = format("SELECT {}({}) FROM t;", fct, inp);
|
||||
try {
|
||||
if (qo) {
|
||||
res = env.execute_cql(query, std::move(qo)).get0();
|
||||
} else {
|
||||
res = env.execute_cql(query).get0();
|
||||
}
|
||||
BOOST_TEST_MESSAGE(format("Query '{}' succeeded as expected", query));
|
||||
} catch (...) {
|
||||
BOOST_ERROR(format("query '{}' failed unexpectedly with error: {}\n{}:{}: originally from here",
|
||||
query, std::current_exception(),
|
||||
loc.file_name(), loc.line()));
|
||||
}
|
||||
return res;
|
||||
}
|
||||
|
||||
// FIXME: should be in cql_assertions, but we don't want to call boost from cql_assertions.hh
|
||||
template <typename Exception>
|
||||
void cql_func_require_throw(
|
||||
cql_test_env& env,
|
||||
const seastar::sstring& fct,
|
||||
const seastar::sstring& inp,
|
||||
std::unique_ptr<cql3::query_options>&& qo = nullptr,
|
||||
const std::experimental::source_location& loc = std::experimental::source_location::current()) {
|
||||
auto query = format("SELECT {}({}) FROM t;", fct, inp);
|
||||
try {
|
||||
if (qo) {
|
||||
env.execute_cql(query, std::move(qo)).get();
|
||||
} else {
|
||||
env.execute_cql(query).get();
|
||||
}
|
||||
BOOST_ERROR(format("query '{}' succeeded unexpectedly\n{}:{}: originally from here", query,
|
||||
loc.file_name(), loc.line()));
|
||||
} catch (Exception& e) {
|
||||
BOOST_TEST_MESSAGE(format("Query '{}' failed as expected with error: {}", query, e));
|
||||
} catch (...) {
|
||||
BOOST_ERROR(format("query '{}' failed with unexpected error: {}\n{}:{}: originally from here",
|
||||
query, std::current_exception(),
|
||||
loc.file_name(), loc.line()));
|
||||
}
|
||||
}
|
||||
|
||||
static void create_time_uuid_fcts_schema(cql_test_env& e) {
|
||||
cquery_nofail(e, "CREATE TABLE t (id int primary key, t timestamp, l bigint, f float, u timeuuid, d date)");
|
||||
cquery_nofail(e, "INSERT INTO t (id, t, l, f, u, d) VALUES "
|
||||
"(1, 1579072460606, 1579072460606000, 1579072460606, a66525e0-3766-11ea-8080-808080808080, '2020-01-13')");
|
||||
cquery_nofail(e, "SELECT * FROM t;");
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_basic_time_uuid_fcts) {
|
||||
return do_with_cql_env_thread([] (auto& e) {
|
||||
create_time_uuid_fcts_schema(e);
|
||||
|
||||
cql_func_require_nofail(e, "currenttime", "");
|
||||
cql_func_require_nofail(e, "currentdate", "");
|
||||
cql_func_require_nofail(e, "now", "");
|
||||
cql_func_require_nofail(e, "currenttimeuuid", "");
|
||||
cql_func_require_nofail(e, "currenttimestamp", "");
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_time_uuid_fcts_input_validation) {
|
||||
return do_with_cql_env_thread([] (auto& e) {
|
||||
create_time_uuid_fcts_schema(e);
|
||||
|
||||
// test timestamp arg
|
||||
auto require_timestamp = [&e] (const sstring& fct) {
|
||||
cql_func_require_nofail(e, fct, "t");
|
||||
cql_func_require_throw<exceptions::server_exception>(e, fct, "l");
|
||||
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "f");
|
||||
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "u");
|
||||
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "d");
|
||||
|
||||
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "currenttime()");
|
||||
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "currentdate()");
|
||||
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "now()");
|
||||
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "currenttimeuuid()");
|
||||
cql_func_require_nofail(e, fct, "currenttimestamp()");
|
||||
};
|
||||
|
||||
require_timestamp("mintimeuuid");
|
||||
require_timestamp("maxtimeuuid");
|
||||
|
||||
// test timeuuid arg
|
||||
auto require_timeuuid = [&e] (const sstring& fct) {
|
||||
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "t");
|
||||
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "l");
|
||||
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "f");
|
||||
cql_func_require_nofail(e, fct, "u");
|
||||
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "d");
|
||||
|
||||
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "currenttime()");
|
||||
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "currentdate()");
|
||||
cql_func_require_nofail(e, fct, "now()");
|
||||
cql_func_require_nofail(e, fct, "currenttimeuuid()");
|
||||
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "currenttimestamp()");
|
||||
};
|
||||
|
||||
require_timeuuid("dateof");
|
||||
require_timeuuid("unixtimestampof");
|
||||
|
||||
// test timeuuid or date arg
|
||||
auto require_timeuuid_or_date = [&e] (const sstring& fct) {
|
||||
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "t");
|
||||
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "l");
|
||||
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "f");
|
||||
cql_func_require_nofail(e, fct, "u");
|
||||
cql_func_require_nofail(e, fct, "d");
|
||||
|
||||
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "currenttime()");
|
||||
cql_func_require_nofail(e, fct, "currentdate()");
|
||||
cql_func_require_nofail(e, fct, "now()");
|
||||
cql_func_require_nofail(e, fct, "currenttimeuuid()");
|
||||
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "currenttimestamp()");
|
||||
};
|
||||
|
||||
require_timeuuid_or_date("totimestamp");
|
||||
|
||||
// test timestamp or timeuuid arg
|
||||
auto require_timestamp_or_timeuuid = [&e] (const sstring& fct) {
|
||||
cql_func_require_nofail(e, fct, "t");
|
||||
cql_func_require_throw<std::exception>(e, fct, "l");
|
||||
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "f");
|
||||
cql_func_require_nofail(e, fct, "u");
|
||||
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "d");
|
||||
|
||||
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "currenttime()");
|
||||
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "currentdate()");
|
||||
cql_func_require_nofail(e, fct, "now()");
|
||||
cql_func_require_nofail(e, fct, "currenttimeuuid()");
|
||||
cql_func_require_nofail(e, fct, "currenttimestamp()");
|
||||
};
|
||||
|
||||
require_timestamp_or_timeuuid("todate");
|
||||
|
||||
// test timestamp, timeuuid, or date arg
|
||||
auto require_timestamp_timeuuid_or_date = [&e] (const sstring& fct) {
|
||||
cql_func_require_nofail(e, fct, "t");
|
||||
cql_func_require_throw<exceptions::server_exception>(e, fct, "l");
|
||||
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "f");
|
||||
cql_func_require_nofail(e, fct, "u");
|
||||
cql_func_require_nofail(e, fct, "d");
|
||||
|
||||
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "currenttime()");
|
||||
cql_func_require_nofail(e, fct, "currentdate()");
|
||||
cql_func_require_nofail(e, fct, "now()");
|
||||
cql_func_require_nofail(e, fct, "currenttimeuuid()");
|
||||
cql_func_require_nofail(e, fct, "currenttimestamp()");
|
||||
};
|
||||
|
||||
require_timestamp_timeuuid_or_date("tounixtimestamp");
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_time_uuid_fcts_result) {
|
||||
return do_with_cql_env_thread([] (auto& e) {
|
||||
create_time_uuid_fcts_schema(e);
|
||||
|
||||
// test timestamp arg
|
||||
auto require_timestamp = [&e] (const sstring& fct) {
|
||||
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "mintimeuuid(t)");
|
||||
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "maxtimeuuid(t)");
|
||||
cql_func_require_nofail(e, fct, "dateof(u)");
|
||||
cql_func_require_nofail(e, fct, "unixtimestampof(u)");
|
||||
cql_func_require_nofail(e, fct, "totimestamp(u)");
|
||||
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "todate(u)");
|
||||
cql_func_require_nofail(e, fct, "tounixtimestamp(u)");
|
||||
};
|
||||
|
||||
require_timestamp("mintimeuuid");
|
||||
require_timestamp("maxtimeuuid");
|
||||
|
||||
// test timeuuid arg
|
||||
auto require_timeuuid = [&e] (const sstring& fct) {
|
||||
cql_func_require_nofail(e, fct, "mintimeuuid(t)");
|
||||
cql_func_require_nofail(e, fct, "maxtimeuuid(t)");
|
||||
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "dateof(u)");
|
||||
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "unixtimestampof(u)");
|
||||
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "totimestamp(u)");
|
||||
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "todate(u)");
|
||||
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "tounixtimestamp(u)");
|
||||
};
|
||||
|
||||
require_timeuuid("dateof");
|
||||
require_timeuuid("unixtimestampof");
|
||||
|
||||
// test timeuuid or date arg
|
||||
auto require_timeuuid_or_date = [&e] (const sstring& fct) {
|
||||
cql_func_require_nofail(e, fct, "mintimeuuid(t)");
|
||||
cql_func_require_nofail(e, fct, "maxtimeuuid(t)");
|
||||
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "dateof(u)");
|
||||
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "unixtimestampof(u)");
|
||||
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "totimestamp(u)");
|
||||
cql_func_require_nofail(e, fct, "todate(u)");
|
||||
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "tounixtimestamp(u)");
|
||||
};
|
||||
|
||||
require_timeuuid_or_date("totimestamp");
|
||||
|
||||
// test timestamp or timeuuid arg
|
||||
auto require_timestamp_or_timeuuid = [&e] (const sstring& fct) {
|
||||
};
|
||||
|
||||
require_timestamp_or_timeuuid("todate");
|
||||
|
||||
// test timestamp, timeuuid, or date arg
|
||||
auto require_timestamp_timeuuid_or_date = [&e] (const sstring& fct) {
|
||||
cql_func_require_nofail(e, fct, "mintimeuuid(t)");
|
||||
cql_func_require_nofail(e, fct, "maxtimeuuid(t)");
|
||||
cql_func_require_nofail(e, fct, "dateof(u)");
|
||||
cql_func_require_nofail(e, fct, "unixtimestampof(u)");
|
||||
cql_func_require_nofail(e, fct, "totimestamp(u)");
|
||||
cql_func_require_nofail(e, fct, "todate(u)");
|
||||
cql_func_require_nofail(e, fct, "tounixtimestamp(u)");
|
||||
};
|
||||
|
||||
require_timestamp_timeuuid_or_date("tounixtimestamp");
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -25,7 +25,7 @@
|
||||
#include <seastar/util/noncopyable_function.hh>
|
||||
|
||||
inline
|
||||
void eventually(noncopyable_function<void ()> f, size_t max_attempts = 12) {
|
||||
void eventually(noncopyable_function<void ()> f, size_t max_attempts = 17) {
|
||||
size_t attempts = 0;
|
||||
while (true) {
|
||||
try {
|
||||
@@ -43,7 +43,7 @@ void eventually(noncopyable_function<void ()> f, size_t max_attempts = 12) {
|
||||
|
||||
inline
|
||||
bool eventually_true(noncopyable_function<bool ()> f) {
|
||||
const unsigned max_attempts = 10;
|
||||
const unsigned max_attempts = 15;
|
||||
unsigned attempts = 0;
|
||||
while (true) {
|
||||
if (f()) {
|
||||
|
||||
@@ -1253,6 +1253,104 @@ SEASTAR_THREAD_TEST_CASE(test_mutation_upgrade_type_change) {
|
||||
assert_that(m).is_equal_to(m2);
|
||||
}
|
||||
|
||||
// This test checks the behavior of row_marker::{is_live, is_dead, compact_and_expire}. Those functions have some
|
||||
// duplicated logic that decides if a row is expired, and this test verifies that they behave the same with respect
|
||||
// to TTL.
|
||||
SEASTAR_THREAD_TEST_CASE(test_row_marker_expiry) {
|
||||
can_gc_fn never_gc = [] (tombstone) { return false; };
|
||||
|
||||
auto must_be_alive = [&] (row_marker mark, gc_clock::time_point t) {
|
||||
BOOST_TEST_MESSAGE(format("must_be_alive({}, {})", mark, t));
|
||||
BOOST_REQUIRE(mark.is_live(tombstone(), t));
|
||||
BOOST_REQUIRE(mark.is_missing() || !mark.is_dead(t));
|
||||
BOOST_REQUIRE(mark.compact_and_expire(tombstone(), t, never_gc, gc_clock::time_point()));
|
||||
};
|
||||
|
||||
auto must_be_dead = [&] (row_marker mark, gc_clock::time_point t) {
|
||||
BOOST_TEST_MESSAGE(format("must_be_dead({}, {})", mark, t));
|
||||
BOOST_REQUIRE(!mark.is_live(tombstone(), t));
|
||||
BOOST_REQUIRE(mark.is_missing() || mark.is_dead(t));
|
||||
BOOST_REQUIRE(!mark.compact_and_expire(tombstone(), t, never_gc, gc_clock::time_point()));
|
||||
};
|
||||
|
||||
const auto timestamp = api::timestamp_type(1);
|
||||
const auto t0 = gc_clock::now();
|
||||
const auto t1 = t0 + 1s;
|
||||
const auto t2 = t0 + 2s;
|
||||
const auto t3 = t0 + 3s;
|
||||
|
||||
// Without timestamp the marker is missing (doesn't exist)
|
||||
const row_marker m1;
|
||||
must_be_dead(m1, t0);
|
||||
must_be_dead(m1, t1);
|
||||
must_be_dead(m1, t2);
|
||||
must_be_dead(m1, t3);
|
||||
|
||||
// With timestamp and without ttl, a row_marker is always alive
|
||||
const row_marker m2(timestamp);
|
||||
must_be_alive(m2, t0);
|
||||
must_be_alive(m2, t1);
|
||||
must_be_alive(m2, t2);
|
||||
must_be_alive(m2, t3);
|
||||
|
||||
// A row_marker becomes dead exactly at the moment of expiry
|
||||
// Reproduces #4263, #5290
|
||||
const auto ttl = 1s;
|
||||
const row_marker m3(timestamp, ttl, t2);
|
||||
must_be_alive(m3, t0);
|
||||
must_be_alive(m3, t1);
|
||||
must_be_dead(m3, t2);
|
||||
must_be_dead(m3, t3);
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_querying_expired_rows) {
|
||||
auto s = schema_builder("ks", "cf")
|
||||
.with_column("pk", bytes_type, column_kind::partition_key)
|
||||
.with_column("ck", bytes_type, column_kind::clustering_key)
|
||||
.build();
|
||||
|
||||
auto pk = partition_key::from_singular(*s, data_value(bytes("key1")));
|
||||
auto ckey1 = clustering_key::from_singular(*s, data_value(bytes("A")));
|
||||
auto ckey2 = clustering_key::from_singular(*s, data_value(bytes("B")));
|
||||
auto ckey3 = clustering_key::from_singular(*s, data_value(bytes("C")));
|
||||
|
||||
auto ttl = 1s;
|
||||
auto t0 = gc_clock::now();
|
||||
auto t1 = t0 + 1s;
|
||||
auto t2 = t0 + 2s;
|
||||
auto t3 = t0 + 3s;
|
||||
|
||||
auto results_at_time = [s] (const mutation& m, gc_clock::time_point t) {
|
||||
auto slice = partition_slice_builder(*s)
|
||||
.without_partition_key_columns()
|
||||
.build();
|
||||
auto opts = query::result_options{query::result_request::result_and_digest, query::digest_algorithm::xxHash};
|
||||
return query::result_set::from_raw_result(s, slice, m.query(slice, opts, t));
|
||||
};
|
||||
|
||||
mutation m(s, pk);
|
||||
m.partition().clustered_row(*m.schema(), ckey1).apply(row_marker(api::new_timestamp(), ttl, t1));
|
||||
m.partition().clustered_row(*m.schema(), ckey2).apply(row_marker(api::new_timestamp(), ttl, t2));
|
||||
m.partition().clustered_row(*m.schema(), ckey3).apply(row_marker(api::new_timestamp(), ttl, t3));
|
||||
|
||||
assert_that(results_at_time(m, t0))
|
||||
.has_size(3)
|
||||
.has(a_row().with_column("ck", data_value(bytes("A"))))
|
||||
.has(a_row().with_column("ck", data_value(bytes("B"))))
|
||||
.has(a_row().with_column("ck", data_value(bytes("C"))));
|
||||
|
||||
assert_that(results_at_time(m, t1))
|
||||
.has_size(2)
|
||||
.has(a_row().with_column("ck", data_value(bytes("B"))))
|
||||
.has(a_row().with_column("ck", data_value(bytes("C"))));
|
||||
|
||||
assert_that(results_at_time(m, t2))
|
||||
.has_size(1)
|
||||
.has(a_row().with_column("ck", data_value(bytes("C"))));
|
||||
|
||||
assert_that(results_at_time(m, t3)).is_empty();
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_querying_expired_cells) {
|
||||
return seastar::async([] {
|
||||
auto s = schema_builder("ks", "cf")
|
||||
|
||||
@@ -421,6 +421,47 @@ public:
|
||||
virtual void on_drop_view(const sstring&, const sstring&) override { ++drop_view_count; }
|
||||
};
|
||||
|
||||
SEASTAR_TEST_CASE(test_alter_nested_type) {
|
||||
return do_with_cql_env_thread([](cql_test_env& e) {
|
||||
e.execute_cql("CREATE TYPE foo (foo_k int);").get();
|
||||
e.execute_cql("CREATE TYPE bar (bar_k frozen<foo>);").get();
|
||||
e.execute_cql("alter type foo add zed_v int;").get();
|
||||
e.execute_cql("CREATE TABLE tbl (key int PRIMARY KEY, val frozen<bar>);").get();
|
||||
e.execute_cql("insert into tbl (key, val) values (1, {bar_k: {foo_k: 2, zed_v: 3} });").get();
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_nested_type_mutation_in_update) {
|
||||
// ALTER TYPE always creates a mutation with a single type. This
|
||||
// creates a mutation with 2 types, one nested in the other, to
|
||||
// show that we can handle that.
|
||||
return do_with_cql_env_thread([](cql_test_env& e) {
|
||||
counting_migration_listener listener;
|
||||
service::get_local_migration_manager().register_listener(&listener);
|
||||
|
||||
e.execute_cql("CREATE TYPE foo (foo_k int);").get();
|
||||
e.execute_cql("CREATE TYPE bar (bar_k frozen<foo>);").get();
|
||||
|
||||
BOOST_REQUIRE_EQUAL(listener.create_user_type_count, 2);
|
||||
|
||||
service::migration_manager& mm = service::get_local_migration_manager();
|
||||
auto&& keyspace = e.db().local().find_keyspace("ks").metadata();
|
||||
|
||||
auto type1 = user_type_impl::get_instance("ks", to_bytes("foo"), {"foo_k", "extra"}, {int32_type, int32_type});
|
||||
auto muts1 = db::schema_tables::make_create_type_mutations(keyspace, type1, api::new_timestamp());
|
||||
|
||||
auto type2 = user_type_impl::get_instance("ks", to_bytes("bar"), {"bar_k", "extra"}, {type1, int32_type});
|
||||
auto muts2 = db::schema_tables::make_create_type_mutations(keyspace, type2, api::new_timestamp());
|
||||
|
||||
auto muts = muts1;
|
||||
muts.insert(muts.end(), muts2.begin(), muts2.end());
|
||||
mm.announce(std::move(muts), false).get();
|
||||
|
||||
BOOST_REQUIRE_EQUAL(listener.create_user_type_count, 2);
|
||||
BOOST_REQUIRE_EQUAL(listener.update_user_type_count, 2);
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_notifications) {
|
||||
return do_with_cql_env([](cql_test_env& e) {
|
||||
return seastar::async([&] {
|
||||
|
||||
@@ -27,6 +27,7 @@
|
||||
#include "types/map.hh"
|
||||
#include "types/list.hh"
|
||||
#include "types/set.hh"
|
||||
#include "cql3/statements/select_statement.hh"
|
||||
|
||||
|
||||
SEASTAR_TEST_CASE(test_secondary_index_regular_column_query) {
|
||||
@@ -1101,3 +1102,65 @@ SEASTAR_TEST_CASE(test_secondary_index_on_partition_key_with_filtering) {
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_indexing_paging_and_aggregation) {
|
||||
static constexpr int row_count = 2 * cql3::statements::select_statement::DEFAULT_COUNT_PAGE_SIZE + 120;
|
||||
|
||||
return do_with_cql_env_thread([] (cql_test_env& e) {
|
||||
cquery_nofail(e, "CREATE TABLE fpa (id int primary key, v int)");
|
||||
cquery_nofail(e, "CREATE INDEX ON fpa(v)");
|
||||
for (int i = 0; i < row_count; ++i) {
|
||||
cquery_nofail(e, format("INSERT INTO fpa (id, v) VALUES ({}, {})", i + 1, i % 2).c_str());
|
||||
}
|
||||
|
||||
eventually([&] {
|
||||
auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{},
|
||||
cql3::query_options::specific_options{2, nullptr, {}, api::new_timestamp()});
|
||||
auto msg = cquery_nofail(e, "SELECT sum(id) FROM fpa WHERE v = 0;", std::move(qo));
|
||||
// Even though we set up paging, we still expect a single result from an aggregation function.
|
||||
// Also, instead of the user-provided page size, internal DEFAULT_COUNT_PAGE_SIZE is expected to be used.
|
||||
assert_that(msg).is_rows().with_rows({
|
||||
{ int32_type->decompose(row_count * row_count / 4)},
|
||||
});
|
||||
|
||||
// Even if paging is not explicitly used, the query will be internally paged to avoid OOM.
|
||||
msg = cquery_nofail(e, "SELECT sum(id) FROM fpa WHERE v = 1;");
|
||||
assert_that(msg).is_rows().with_rows({
|
||||
{ int32_type->decompose(row_count * row_count / 4 + row_count / 2)},
|
||||
});
|
||||
|
||||
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{},
|
||||
cql3::query_options::specific_options{3, nullptr, {}, api::new_timestamp()});
|
||||
msg = cquery_nofail(e, "SELECT avg(id) FROM fpa WHERE v = 1;", std::move(qo));
|
||||
assert_that(msg).is_rows().with_rows({
|
||||
{ int32_type->decompose(row_count / 2 + 1)},
|
||||
});
|
||||
});
|
||||
|
||||
// Similar, but this time a non-prefix clustering key part is indexed (wrt. issue 3405, after which we have
|
||||
// a special code path for indexing composite non-prefix clustering keys).
|
||||
cquery_nofail(e, "CREATE TABLE fpa2 (id int, c1 int, c2 int, primary key (id, c1, c2))");
|
||||
cquery_nofail(e, "CREATE INDEX ON fpa2(c2)");
|
||||
|
||||
eventually([&] {
|
||||
for (int i = 0; i < row_count; ++i) {
|
||||
cquery_nofail(e, format("INSERT INTO fpa2 (id, c1, c2) VALUES ({}, {}, {})", i + 1, i + 1, i % 2).c_str());
|
||||
}
|
||||
|
||||
auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{},
|
||||
cql3::query_options::specific_options{2, nullptr, {}, api::new_timestamp()});
|
||||
auto msg = cquery_nofail(e, "SELECT sum(id) FROM fpa2 WHERE c2 = 0;", std::move(qo));
|
||||
// Even though we set up paging, we still expect a single result from an aggregation function
|
||||
assert_that(msg).is_rows().with_rows({
|
||||
{ int32_type->decompose(row_count * row_count / 4)},
|
||||
});
|
||||
|
||||
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{},
|
||||
cql3::query_options::specific_options{3, nullptr, {}, api::new_timestamp()});
|
||||
msg = cquery_nofail(e, "SELECT avg(id) FROM fpa2 WHERE c2 = 1;", std::move(qo));
|
||||
assert_that(msg).is_rows().with_rows({
|
||||
{ int32_type->decompose(row_count / 2 + 1)},
|
||||
});
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
@@ -385,6 +385,8 @@ BOOST_AUTO_TEST_CASE(test_varint) {
|
||||
BOOST_CHECK_EQUAL(value_cast<boost::multiprecision::cpp_int>(varint_type->deserialize(from_hex("00deadbeef"))), boost::multiprecision::cpp_int("0xdeadbeef"));
|
||||
BOOST_CHECK_EQUAL(value_cast<boost::multiprecision::cpp_int>(varint_type->deserialize(from_hex("00ffffffffffffffffffffffffffffffff"))), boost::multiprecision::cpp_int("340282366920938463463374607431768211455"));
|
||||
|
||||
BOOST_REQUIRE_EQUAL(from_hex("80000000"), varint_type->decompose(boost::multiprecision::cpp_int(-2147483648)));
|
||||
|
||||
test_parsing_fails(varint_type, "1A");
|
||||
}
|
||||
|
||||
|
||||
@@ -515,7 +515,7 @@ SEASTAR_TEST_CASE(test_update_column_not_in_view_with_flush) {
|
||||
}
|
||||
|
||||
void test_partial_update_with_unselected_collections(cql_test_env& e, std::function<void()>&& maybe_flush) {
|
||||
e.execute_cql("create table cf (p int, c int, a int, b int, l list<int>, s set<int>, m map<int,int>, primary key (p, c))").get();
|
||||
e.execute_cql("create table cf (p int, c int, a int, b int, l list<int>, s set<int>, m map<int,text>, primary key (p, c))").get();
|
||||
e.execute_cql("create materialized view vcf as select a, b from cf "
|
||||
"where p is not null and c is not null "
|
||||
"primary key (c, p)").get();
|
||||
@@ -563,7 +563,7 @@ e.execute_cql("create table cf (p int, c int, a int, b int, l list<int>, s set<i
|
||||
assert_that(msg).is_rows().is_empty();
|
||||
});
|
||||
|
||||
e.execute_cql("update cf set m=m+{3:3}, l=l-[1], s=s-{2} where p = 1 and c = 1").get();
|
||||
e.execute_cql("update cf set m=m+{3:'text'}, l=l-[1], s=s-{2} where p = 1 and c = 1").get();
|
||||
maybe_flush();
|
||||
eventually([&] {
|
||||
auto msg = e.execute_cql("select * from vcf").get0();
|
||||
|
||||
@@ -38,6 +38,7 @@ if [[ "$1" = -* ]]; then
|
||||
fi
|
||||
|
||||
docker_common_args=(
|
||||
--pids-limit -1 \
|
||||
--network host \
|
||||
-u "$(id -u):$(id -g)" \
|
||||
"${group_args[@]}" \
|
||||
|
||||
@@ -206,8 +206,9 @@ void tracing::set_trace_probability(double p) {
|
||||
}
|
||||
|
||||
one_session_records::one_session_records()
|
||||
: backend_state_ptr(tracing::get_local_tracing_instance().allocate_backend_session_state())
|
||||
, budget_ptr(tracing::get_local_tracing_instance().get_cached_records_ptr()) {}
|
||||
: _local_tracing_ptr(tracing::get_local_tracing_instance().shared_from_this())
|
||||
, backend_state_ptr(_local_tracing_ptr->allocate_backend_session_state())
|
||||
, budget_ptr(_local_tracing_ptr->get_cached_records_ptr()) {}
|
||||
|
||||
std::ostream& operator<<(std::ostream& os, const span_id& id) {
|
||||
return os << id.get_id();
|
||||
|
||||
@@ -240,6 +240,8 @@ public:
|
||||
};
|
||||
|
||||
class one_session_records {
|
||||
private:
|
||||
shared_ptr<tracing> _local_tracing_ptr;
|
||||
public:
|
||||
utils::UUID session_id;
|
||||
session_record session_rec;
|
||||
@@ -665,7 +667,7 @@ private:
|
||||
|
||||
void one_session_records::set_pending_for_write() {
|
||||
_is_pending_for_write = true;
|
||||
budget_ptr = tracing::get_local_tracing_instance().get_pending_records_ptr();
|
||||
budget_ptr = _local_tracing_ptr->get_pending_records_ptr();
|
||||
}
|
||||
|
||||
void one_session_records::data_consumed() {
|
||||
@@ -674,7 +676,7 @@ void one_session_records::data_consumed() {
|
||||
}
|
||||
|
||||
_is_pending_for_write = false;
|
||||
budget_ptr = tracing::get_local_tracing_instance().get_cached_records_ptr();
|
||||
budget_ptr = _local_tracing_ptr->get_cached_records_ptr();
|
||||
}
|
||||
|
||||
inline span_id span_id::make_span_id() {
|
||||
|
||||
16
types.cc
16
types.cc
@@ -1558,6 +1558,13 @@ public:
|
||||
}
|
||||
out = std::copy(b.crbegin(), b.crend(), out);
|
||||
}
|
||||
static size_t serialized_size_aux(const boost::multiprecision::cpp_int& num) {
|
||||
if (num) {
|
||||
return align_up(boost::multiprecision::msb(num) + 2, 8u) / 8;
|
||||
} else {
|
||||
return 1;
|
||||
}
|
||||
}
|
||||
virtual size_t serialized_size(const void* value) const override {
|
||||
if (!value) {
|
||||
return 0;
|
||||
@@ -1570,8 +1577,10 @@ public:
|
||||
if (!num) {
|
||||
return 1;
|
||||
}
|
||||
auto pnum = abs(num);
|
||||
return align_up(boost::multiprecision::msb(pnum) + 2, 8u) / 8;
|
||||
if (num < 0) {
|
||||
return serialized_size_aux(-num - 1);
|
||||
}
|
||||
return serialized_size_aux(num);
|
||||
}
|
||||
virtual int32_t compare(bytes_view v1, bytes_view v2) const override {
|
||||
if (v1.empty()) {
|
||||
@@ -2087,8 +2096,7 @@ struct empty_type_impl : abstract_type {
|
||||
return false;
|
||||
}
|
||||
virtual std::optional<data_type> update_user_type(const shared_ptr<const user_type_impl> updated) const {
|
||||
// Can't happen
|
||||
abort();
|
||||
return std::nullopt;
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
@@ -51,6 +51,7 @@ public:
|
||||
bytes_view field_name(size_t i) const { return _field_names[i]; }
|
||||
sstring field_name_as_string(size_t i) const { return _string_field_names[i]; }
|
||||
const std::vector<bytes>& field_names() const { return _field_names; }
|
||||
const std::vector<sstring>& string_field_names() const { return _string_field_names; }
|
||||
sstring get_name_as_string() const;
|
||||
virtual sstring cql3_type_name_impl() const override;
|
||||
virtual bool is_native() const override { return false; }
|
||||
|
||||
@@ -59,11 +59,15 @@ public:
|
||||
return (most_sig_bits >> 12) & 0xf;
|
||||
}
|
||||
|
||||
bool is_timestamp() const {
|
||||
return version() == 1;
|
||||
}
|
||||
|
||||
int64_t timestamp() const {
|
||||
//if (version() != 1) {
|
||||
// throw new UnsupportedOperationException("Not a time-based UUID");
|
||||
//}
|
||||
assert(version() == 1);
|
||||
assert(is_timestamp());
|
||||
|
||||
return ((most_sig_bits & 0xFFF) << 48) |
|
||||
(((most_sig_bits >> 16) & 0xFFFF) << 32) |
|
||||
|
||||
@@ -75,7 +75,7 @@ private:
|
||||
// placement of this singleton is important. It needs to be instantiated *AFTER* the other statics.
|
||||
static thread_local const std::unique_ptr<UUID_gen> instance;
|
||||
|
||||
int64_t last_nanos = 0;
|
||||
uint64_t last_nanos = 0;
|
||||
|
||||
UUID_gen()
|
||||
{
|
||||
@@ -91,7 +91,9 @@ public:
|
||||
*/
|
||||
static UUID get_time_UUID()
|
||||
{
|
||||
return UUID(instance->create_time_safe(), clock_seq_and_node);
|
||||
auto uuid = UUID(instance->create_time_safe(), clock_seq_and_node);
|
||||
assert(uuid.is_timestamp());
|
||||
return uuid;
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -101,7 +103,9 @@ public:
|
||||
*/
|
||||
static UUID get_time_UUID(int64_t when)
|
||||
{
|
||||
return UUID(create_time(from_unix_timestamp(when)), clock_seq_and_node);
|
||||
auto uuid = UUID(create_time(from_unix_timestamp(when)), clock_seq_and_node);
|
||||
assert(uuid.is_timestamp());
|
||||
return uuid;
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -115,12 +119,21 @@ public:
|
||||
// "nanos" needs to be in 100ns intervals since the adoption of the Gregorian calendar in the West.
|
||||
uint64_t nanos = duration_cast<nanoseconds>(tp.time_since_epoch()).count() / 100;
|
||||
nanos -= (10000ULL * START_EPOCH);
|
||||
return UUID(create_time(nanos), clock_seq_and_node);
|
||||
auto uuid = UUID(create_time(nanos), clock_seq_and_node);
|
||||
assert(uuid.is_timestamp());
|
||||
return uuid;
|
||||
}
|
||||
|
||||
static UUID get_time_UUID(int64_t when, int64_t clock_seq_and_node)
|
||||
{
|
||||
return UUID(create_time(from_unix_timestamp(when)), clock_seq_and_node);
|
||||
auto uuid = UUID(create_time(from_unix_timestamp(when)), clock_seq_and_node);
|
||||
assert(uuid.is_timestamp());
|
||||
return uuid;
|
||||
}
|
||||
|
||||
/** validates uuid from raw bytes. */
|
||||
static bool is_valid_UUID(bytes raw) {
|
||||
return raw.size() == 16;
|
||||
}
|
||||
|
||||
/** creates uuid from raw bytes. */
|
||||
@@ -176,7 +189,9 @@ public:
|
||||
*/
|
||||
static UUID min_time_UUID(int64_t timestamp)
|
||||
{
|
||||
return UUID(create_time(from_unix_timestamp(timestamp)), MIN_CLOCK_SEQ_AND_NODE);
|
||||
auto uuid = UUID(create_time(from_unix_timestamp(timestamp)), MIN_CLOCK_SEQ_AND_NODE);
|
||||
assert(uuid.is_timestamp());
|
||||
return uuid;
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -192,7 +207,9 @@ public:
|
||||
// timestamp 1ms, then we should not extend 100's nanoseconds
|
||||
// precision by taking 10000, but rather 19999.
|
||||
int64_t uuid_tstamp = from_unix_timestamp(timestamp + 1) - 1;
|
||||
return UUID(create_time(uuid_tstamp), MAX_CLOCK_SEQ_AND_NODE);
|
||||
auto uuid = UUID(create_time(uuid_tstamp), MAX_CLOCK_SEQ_AND_NODE);
|
||||
assert(uuid.is_timestamp());
|
||||
return uuid;
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -276,6 +293,15 @@ public:
|
||||
return (uuid.timestamp() / 10000) + START_EPOCH;
|
||||
}
|
||||
|
||||
static uint64_t make_nanos_since(int64_t millis) {
|
||||
return (static_cast<uint64_t>(millis) - static_cast<uint64_t>(START_EPOCH)) * 10000;
|
||||
}
|
||||
|
||||
// nanos_since must fit in 60 bits
|
||||
static bool is_valid_nanos_since(uint64_t nanos_since) {
|
||||
return !(0xf000000000000000UL & nanos_since);
|
||||
}
|
||||
|
||||
private:
|
||||
|
||||
// needs to return two different values for the same when.
|
||||
@@ -287,7 +313,7 @@ private:
|
||||
using namespace std::chrono;
|
||||
int64_t millis = duration_cast<milliseconds>(
|
||||
system_clock::now().time_since_epoch()).count();
|
||||
int64_t nanos_since = (millis - START_EPOCH) * 10000;
|
||||
uint64_t nanos_since = make_nanos_since(millis);
|
||||
if (nanos_since > last_nanos)
|
||||
last_nanos = nanos_since;
|
||||
else
|
||||
@@ -298,16 +324,17 @@ private:
|
||||
|
||||
int64_t create_time_unsafe(int64_t when, int nanos)
|
||||
{
|
||||
uint64_t nanos_since = ((when - START_EPOCH) * 10000) + nanos;
|
||||
uint64_t nanos_since = make_nanos_since(when) + static_cast<uint64_t>(static_cast<int64_t>(nanos));
|
||||
return create_time(nanos_since);
|
||||
}
|
||||
|
||||
static int64_t create_time(uint64_t nanos_since)
|
||||
{
|
||||
uint64_t msb = 0L;
|
||||
assert(is_valid_nanos_since(nanos_since));
|
||||
msb |= (0x00000000ffffffffL & nanos_since) << 32;
|
||||
msb |= (0x0000ffff00000000UL & nanos_since) >> 16;
|
||||
msb |= (0xffff000000000000UL & nanos_since) >> 48;
|
||||
msb |= (0x0fff000000000000UL & nanos_since) >> 48;
|
||||
msb |= 0x0000000000001000L; // sets the version to 1.
|
||||
return msb;
|
||||
}
|
||||
|
||||
@@ -246,10 +246,18 @@ public:
|
||||
public:
|
||||
const T& front() const { return *cbegin(); }
|
||||
T& front() { return *begin(); }
|
||||
iterator begin() const { return iterator(_chunks.data(), 0); }
|
||||
iterator end() const { return iterator(_chunks.data(), _size); }
|
||||
iterator begin() { return iterator(_chunks.data(), 0); }
|
||||
iterator end() { return iterator(_chunks.data(), _size); }
|
||||
const_iterator begin() const { return const_iterator(_chunks.data(), 0); }
|
||||
const_iterator end() const { return const_iterator(_chunks.data(), _size); }
|
||||
const_iterator cbegin() const { return const_iterator(_chunks.data(), 0); }
|
||||
const_iterator cend() const { return const_iterator(_chunks.data(), _size); }
|
||||
std::reverse_iterator<iterator> rbegin() { return std::reverse_iterator(end()); }
|
||||
std::reverse_iterator<iterator> rend() { return std::reverse_iterator(begin()); }
|
||||
std::reverse_iterator<const_iterator> rbegin() const { return std::reverse_iterator(end()); }
|
||||
std::reverse_iterator<const_iterator> rend() const { return std::reverse_iterator(begin()); }
|
||||
std::reverse_iterator<const_iterator> crbegin() const { return std::reverse_iterator(cend()); }
|
||||
std::reverse_iterator<const_iterator> crend() const { return std::reverse_iterator(cbegin()); }
|
||||
public:
|
||||
bool operator==(const chunked_vector& x) const {
|
||||
return boost::equal(*this, x);
|
||||
|
||||
@@ -2065,6 +2065,17 @@ bool segment_pool::migrate_segment(segment* src, segment* dst)
|
||||
#endif
|
||||
|
||||
void tracker::impl::register_region(region::impl* r) {
|
||||
// If needed, increase capacity of regions before taking the reclaim lock,
|
||||
// to avoid failing an allocation when push_back() tries to increase
|
||||
// capacity.
|
||||
//
|
||||
// The capacity increase is atomic (wrt _regions) so it cannot be
|
||||
// observed
|
||||
if (_regions.size() == _regions.capacity()) {
|
||||
auto copy = _regions;
|
||||
copy.reserve(copy.capacity() * 2);
|
||||
_regions = std::move(copy);
|
||||
}
|
||||
reclaiming_lock _(*this);
|
||||
_regions.push_back(r);
|
||||
llogger.debug("Registered region @{} with id={}", r, r->id());
|
||||
|
||||
Reference in New Issue
Block a user