Compare commits
44 Commits
scylla-3.2
...
next-3.2
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d112a230c0 | ||
|
|
4371cb41d0 | ||
|
|
a8b9f94dcb | ||
|
|
77500f9171 | ||
|
|
13328e7253 | ||
|
|
79b58f89f1 | ||
|
|
ba2821ec70 | ||
|
|
d72555e786 | ||
|
|
4c38534f75 | ||
|
|
a092f5d1f4 | ||
|
|
723fd50712 | ||
|
|
89deac7795 | ||
|
|
3843e5233c | ||
|
|
1b3c78480c | ||
|
|
48253eb183 | ||
|
|
5d60522c81 | ||
|
|
63e93110d1 | ||
|
|
83105efba8 | ||
|
|
5840eb602a | ||
|
|
61738999ea | ||
|
|
0b23e7145d | ||
|
|
3374aa20bb | ||
|
|
c4e89ea1b0 | ||
|
|
26d9ce6b98 | ||
|
|
6d1a4e2c0b | ||
|
|
fad143a441 | ||
|
|
bc07b877a5 | ||
|
|
09ad011f98 | ||
|
|
b34973df4e | ||
|
|
d65e2ac6af | ||
|
|
dbf72c72b3 | ||
|
|
b542b9c89a | ||
|
|
c0e493edcc | ||
|
|
88718996ed | ||
|
|
97236a2cee | ||
|
|
6c272b48f5 | ||
|
|
6a8ae87efa | ||
|
|
d24d9d037e | ||
|
|
43766bd453 | ||
|
|
ddd8f9b1d1 | ||
|
|
e3e301906d | ||
|
|
2c822d4c1f | ||
|
|
04f8800b5b | ||
|
|
a72a06d3b7 |
@@ -1,7 +1,7 @@
|
||||
#!/bin/sh
|
||||
|
||||
PRODUCT=scylla
|
||||
VERSION=3.2.0
|
||||
VERSION=3.2.5
|
||||
|
||||
if test -f version
|
||||
then
|
||||
|
||||
@@ -254,6 +254,9 @@ void set_storage_service(http_context& ctx, routes& r) {
|
||||
if (column_family.empty()) {
|
||||
resp = service::get_local_storage_service().take_snapshot(tag, keynames);
|
||||
} else {
|
||||
if (keynames.empty()) {
|
||||
throw httpd::bad_param_exception("The keyspace of column families must be specified");
|
||||
}
|
||||
if (keynames.size() > 1) {
|
||||
throw httpd::bad_param_exception("Only one keyspace allowed when specifying a column family");
|
||||
}
|
||||
|
||||
@@ -244,7 +244,6 @@ batch_size_fail_threshold_in_kb: 50
|
||||
# experimental_features:
|
||||
# - cdc
|
||||
# - lwt
|
||||
# - udf
|
||||
|
||||
# The directory where hints files are stored if hinted handoff is enabled.
|
||||
# hints_directory: /var/lib/scylla/hints
|
||||
|
||||
@@ -266,7 +266,7 @@ bool column_condition::applies_to(const data_value* cell_value, const query_opti
|
||||
return value.has_value() && is_satisfied_by(operator_type::EQ, *cell_value->type(), *column.type, *cell_value, *value);
|
||||
});
|
||||
} else {
|
||||
return std::any_of(in_values.begin(), in_values.end(), [] (const bytes_opt& value) { return value.has_value() == false; });
|
||||
return std::any_of(in_values.begin(), in_values.end(), [] (const bytes_opt& value) { return !value.has_value() || value->empty(); });
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -61,6 +61,16 @@ make_now_fct() {
|
||||
});
|
||||
}
|
||||
|
||||
static int64_t get_valid_timestamp(const data_value& ts_obj) {
|
||||
auto ts = value_cast<db_clock::time_point>(ts_obj);
|
||||
int64_t ms = ts.time_since_epoch().count();
|
||||
auto nanos_since = utils::UUID_gen::make_nanos_since(ms);
|
||||
if (!utils::UUID_gen::is_valid_nanos_since(nanos_since)) {
|
||||
throw exceptions::server_exception(format("{}: timestamp is out of range. Must be in milliseconds since epoch", ms));
|
||||
}
|
||||
return ms;
|
||||
}
|
||||
|
||||
inline
|
||||
shared_ptr<function>
|
||||
make_min_timeuuid_fct() {
|
||||
@@ -74,8 +84,7 @@ make_min_timeuuid_fct() {
|
||||
if (ts_obj.is_null()) {
|
||||
return {};
|
||||
}
|
||||
auto ts = value_cast<db_clock::time_point>(ts_obj);
|
||||
auto uuid = utils::UUID_gen::min_time_UUID(ts.time_since_epoch().count());
|
||||
auto uuid = utils::UUID_gen::min_time_UUID(get_valid_timestamp(ts_obj));
|
||||
return {timeuuid_type->decompose(uuid)};
|
||||
});
|
||||
}
|
||||
@@ -85,7 +94,6 @@ shared_ptr<function>
|
||||
make_max_timeuuid_fct() {
|
||||
return make_native_scalar_function<true>("maxtimeuuid", timeuuid_type, { timestamp_type },
|
||||
[] (cql_serialization_format sf, const std::vector<bytes_opt>& values) -> bytes_opt {
|
||||
// FIXME: should values be a vector<optional<bytes>>?
|
||||
auto& bb = values[0];
|
||||
if (!bb) {
|
||||
return {};
|
||||
@@ -94,12 +102,22 @@ make_max_timeuuid_fct() {
|
||||
if (ts_obj.is_null()) {
|
||||
return {};
|
||||
}
|
||||
auto ts = value_cast<db_clock::time_point>(ts_obj);
|
||||
auto uuid = utils::UUID_gen::max_time_UUID(ts.time_since_epoch().count());
|
||||
auto uuid = utils::UUID_gen::max_time_UUID(get_valid_timestamp(ts_obj));
|
||||
return {timeuuid_type->decompose(uuid)};
|
||||
});
|
||||
}
|
||||
|
||||
inline utils::UUID get_valid_timeuuid(bytes raw) {
|
||||
if (!utils::UUID_gen::is_valid_UUID(raw)) {
|
||||
throw exceptions::server_exception(format("invalid timeuuid: size={}", raw.size()));
|
||||
}
|
||||
auto uuid = utils::UUID_gen::get_UUID(raw);
|
||||
if (!uuid.is_timestamp()) {
|
||||
throw exceptions::server_exception(format("{}: Not a timeuuid: version={}", uuid, uuid.version()));
|
||||
}
|
||||
return uuid;
|
||||
}
|
||||
|
||||
inline
|
||||
shared_ptr<function>
|
||||
make_date_of_fct() {
|
||||
@@ -110,7 +128,7 @@ make_date_of_fct() {
|
||||
if (!bb) {
|
||||
return {};
|
||||
}
|
||||
auto ts = db_clock::time_point(db_clock::duration(UUID_gen::unix_timestamp(UUID_gen::get_UUID(*bb))));
|
||||
auto ts = db_clock::time_point(db_clock::duration(UUID_gen::unix_timestamp(get_valid_timeuuid(*bb))));
|
||||
return {timestamp_type->decompose(ts)};
|
||||
});
|
||||
}
|
||||
@@ -125,7 +143,7 @@ make_unix_timestamp_of_fct() {
|
||||
if (!bb) {
|
||||
return {};
|
||||
}
|
||||
return {long_type->decompose(UUID_gen::unix_timestamp(UUID_gen::get_UUID(*bb)))};
|
||||
return {long_type->decompose(UUID_gen::unix_timestamp(get_valid_timeuuid(*bb)))};
|
||||
});
|
||||
}
|
||||
|
||||
@@ -176,7 +194,7 @@ make_timeuuidtodate_fct() {
|
||||
if (!bb) {
|
||||
return {};
|
||||
}
|
||||
auto ts = db_clock::time_point(db_clock::duration(UUID_gen::unix_timestamp(UUID_gen::get_UUID(*bb))));
|
||||
auto ts = db_clock::time_point(db_clock::duration(UUID_gen::unix_timestamp(get_valid_timeuuid(*bb))));
|
||||
auto to_simple_date = get_castas_fctn(simple_date_type, timestamp_type);
|
||||
return {simple_date_type->decompose(to_simple_date(ts))};
|
||||
});
|
||||
@@ -211,7 +229,7 @@ make_timeuuidtotimestamp_fct() {
|
||||
if (!bb) {
|
||||
return {};
|
||||
}
|
||||
auto ts = db_clock::time_point(db_clock::duration(UUID_gen::unix_timestamp(UUID_gen::get_UUID(*bb))));
|
||||
auto ts = db_clock::time_point(db_clock::duration(UUID_gen::unix_timestamp(get_valid_timeuuid(*bb))));
|
||||
return {timestamp_type->decompose(ts)};
|
||||
});
|
||||
}
|
||||
@@ -245,10 +263,14 @@ make_timeuuidtounixtimestamp_fct() {
|
||||
if (!bb) {
|
||||
return {};
|
||||
}
|
||||
return {long_type->decompose(UUID_gen::unix_timestamp(UUID_gen::get_UUID(*bb)))};
|
||||
return {long_type->decompose(UUID_gen::unix_timestamp(get_valid_timeuuid(*bb)))};
|
||||
});
|
||||
}
|
||||
|
||||
inline bytes time_point_to_long(const data_value& v) {
|
||||
return data_value(get_valid_timestamp(v)).serialize();
|
||||
}
|
||||
|
||||
inline
|
||||
shared_ptr<function>
|
||||
make_timestamptounixtimestamp_fct() {
|
||||
@@ -263,7 +285,7 @@ make_timestamptounixtimestamp_fct() {
|
||||
if (ts_obj.is_null()) {
|
||||
return {};
|
||||
}
|
||||
return {long_type->decompose(ts_obj)};
|
||||
return time_point_to_long(ts_obj);
|
||||
});
|
||||
}
|
||||
|
||||
@@ -282,7 +304,7 @@ make_datetounixtimestamp_fct() {
|
||||
return {};
|
||||
}
|
||||
auto from_simple_date = get_castas_fctn(timestamp_type, simple_date_type);
|
||||
return {long_type->decompose(from_simple_date(simple_date_obj))};
|
||||
return time_point_to_long(from_simple_date(simple_date_obj));
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -390,28 +390,45 @@ std::vector<const column_definition*> statement_restrictions::get_column_defs_fo
|
||||
if (need_filtering()) {
|
||||
auto& sim = db.find_column_family(_schema).get_index_manager();
|
||||
auto [opt_idx, _] = find_idx(sim);
|
||||
auto column_uses_indexing = [&opt_idx] (const column_definition* cdef) {
|
||||
return opt_idx && opt_idx->depends_on(*cdef);
|
||||
auto column_uses_indexing = [&opt_idx] (const column_definition* cdef, ::shared_ptr<single_column_restriction> restr) {
|
||||
return opt_idx && restr && restr->is_supported_by(*opt_idx);
|
||||
};
|
||||
auto single_pk_restrs = dynamic_pointer_cast<single_column_partition_key_restrictions>(_partition_key_restrictions);
|
||||
if (_partition_key_restrictions->needs_filtering(*_schema)) {
|
||||
for (auto&& cdef : _partition_key_restrictions->get_column_defs()) {
|
||||
if (!column_uses_indexing(cdef)) {
|
||||
::shared_ptr<single_column_restriction> restr;
|
||||
if (single_pk_restrs) {
|
||||
auto it = single_pk_restrs->restrictions().find(cdef);
|
||||
if (it != single_pk_restrs->restrictions().end()) {
|
||||
restr = dynamic_pointer_cast<single_column_restriction>(it->second);
|
||||
}
|
||||
}
|
||||
if (!column_uses_indexing(cdef, restr)) {
|
||||
column_defs_for_filtering.emplace_back(cdef);
|
||||
}
|
||||
}
|
||||
}
|
||||
auto single_ck_restrs = dynamic_pointer_cast<single_column_clustering_key_restrictions>(_clustering_columns_restrictions);
|
||||
const bool pk_has_unrestricted_components = _partition_key_restrictions->has_unrestricted_components(*_schema);
|
||||
if (pk_has_unrestricted_components || _clustering_columns_restrictions->needs_filtering(*_schema)) {
|
||||
column_id first_filtering_id = pk_has_unrestricted_components ? 0 : _schema->clustering_key_columns().begin()->id +
|
||||
_clustering_columns_restrictions->num_prefix_columns_that_need_not_be_filtered();
|
||||
for (auto&& cdef : _clustering_columns_restrictions->get_column_defs()) {
|
||||
if (cdef->id >= first_filtering_id && !column_uses_indexing(cdef)) {
|
||||
::shared_ptr<single_column_restriction> restr;
|
||||
if (single_pk_restrs) {
|
||||
auto it = single_ck_restrs->restrictions().find(cdef);
|
||||
if (it != single_ck_restrs->restrictions().end()) {
|
||||
restr = dynamic_pointer_cast<single_column_restriction>(it->second);
|
||||
}
|
||||
}
|
||||
if (cdef->id >= first_filtering_id && !column_uses_indexing(cdef, restr)) {
|
||||
column_defs_for_filtering.emplace_back(cdef);
|
||||
}
|
||||
}
|
||||
}
|
||||
for (auto&& cdef : _nonprimary_key_restrictions->get_column_defs()) {
|
||||
if (!column_uses_indexing(cdef)) {
|
||||
auto restr = dynamic_pointer_cast<single_column_restriction>(_nonprimary_key_restrictions->get_restriction(*cdef));
|
||||
if (!column_uses_indexing(cdef, restr)) {
|
||||
column_defs_for_filtering.emplace_back(cdef);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -92,6 +92,14 @@ public:
|
||||
: abstract_function_selector(fun, std::move(arg_selectors))
|
||||
, _tfun(dynamic_pointer_cast<T>(fun)) {
|
||||
}
|
||||
|
||||
const functions::function_name& name() const {
|
||||
return _tfun->name();
|
||||
}
|
||||
|
||||
virtual sstring assignment_testable_source_context() const override {
|
||||
return format("{}", this->name());
|
||||
}
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
@@ -79,11 +79,6 @@ public:
|
||||
dynamic_pointer_cast<functions::aggregate_function>(func), std::move(arg_selectors))
|
||||
, _aggregate(fun()->new_aggregate()) {
|
||||
}
|
||||
|
||||
virtual sstring assignment_testable_source_context() const override {
|
||||
// FIXME:
|
||||
return "FIXME";
|
||||
}
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
@@ -82,12 +82,6 @@ public:
|
||||
: abstract_function_selector_for<functions::scalar_function>(
|
||||
dynamic_pointer_cast<functions::scalar_function>(std::move(fun)), std::move(arg_selectors)) {
|
||||
}
|
||||
|
||||
virtual sstring assignment_testable_source_context() const override {
|
||||
// FIXME:
|
||||
return "FIXME";
|
||||
}
|
||||
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
@@ -1316,7 +1316,7 @@ future<db::commitlog::segment_manager::sseg_ptr> db::commitlog::segment_manager:
|
||||
v.emplace_back(iovec{ buf.get_write(), s});
|
||||
m += s;
|
||||
}
|
||||
return f.dma_write(max_size - rem, std::move(v)).then([&rem](size_t s) {
|
||||
return f.dma_write(max_size - rem, std::move(v), service::get_local_commitlog_priority()).then([&rem](size_t s) {
|
||||
rem -= s;
|
||||
return stop_iteration::no;
|
||||
});
|
||||
|
||||
@@ -691,8 +691,9 @@ db::config::config(std::shared_ptr<db::extensions> exts)
|
||||
, shutdown_announce_in_ms(this, "shutdown_announce_in_ms", value_status::Used, 2 * 1000, "Time a node waits after sending gossip shutdown message in milliseconds. Same as -Dcassandra.shutdown_announce_in_ms in cassandra.")
|
||||
, developer_mode(this, "developer_mode", value_status::Used, false, "Relax environment checks. Setting to true can reduce performance and reliability significantly.")
|
||||
, skip_wait_for_gossip_to_settle(this, "skip_wait_for_gossip_to_settle", value_status::Used, -1, "An integer to configure the wait for gossip to settle. -1: wait normally, 0: do not wait at all, n: wait for at most n polls. Same as -Dcassandra.skip_wait_for_gossip_to_settle in cassandra.")
|
||||
, force_gossip_generation(this, "force_gossip_generation", liveness::LiveUpdate, value_status::Used, -1 , "Force gossip to use the generation number provided by user")
|
||||
, experimental(this, "experimental", value_status::Used, false, "Set to true to unlock all experimental features.")
|
||||
, experimental_features(this, "experimental_features", value_status::Used, {}, "Unlock experimental features provided as the option arguments (possible values: 'lwt', 'cdc', 'udf'). Can be repeated.")
|
||||
, experimental_features(this, "experimental_features", value_status::Used, {}, "Unlock experimental features provided as the option arguments (possible values: 'lwt' and 'cdc'). Can be repeated.")
|
||||
, lsa_reclamation_step(this, "lsa_reclamation_step", value_status::Used, 1, "Minimum number of segments to reclaim in a single step")
|
||||
, prometheus_port(this, "prometheus_port", value_status::Used, 9180, "Prometheus port, set to zero to disable")
|
||||
, prometheus_address(this, "prometheus_address", value_status::Used, "0.0.0.0", "Prometheus listening address")
|
||||
@@ -855,7 +856,7 @@ const db::extensions& db::config::extensions() const {
|
||||
std::unordered_map<sstring, db::experimental_features_t::feature> db::experimental_features_t::map() {
|
||||
// We decided against using the construct-on-first-use idiom here:
|
||||
// https://github.com/scylladb/scylla/pull/5369#discussion_r353614807
|
||||
return {{"lwt", LWT}, {"udf", UDF}, {"cdc", CDC}};
|
||||
return {{"lwt", LWT}, {"cdc", CDC}};
|
||||
}
|
||||
|
||||
template struct utils::config_file::named_value<seastar::log_level>;
|
||||
|
||||
@@ -78,7 +78,7 @@ namespace db {
|
||||
|
||||
/// Enumeration of all valid values for the `experimental` config entry.
|
||||
struct experimental_features_t {
|
||||
enum feature { LWT, UDF, CDC };
|
||||
enum feature { LWT, CDC };
|
||||
static std::unordered_map<sstring, feature> map(); // See enum_option.
|
||||
};
|
||||
|
||||
@@ -269,6 +269,7 @@ public:
|
||||
named_value<uint32_t> shutdown_announce_in_ms;
|
||||
named_value<bool> developer_mode;
|
||||
named_value<int32_t> skip_wait_for_gossip_to_settle;
|
||||
named_value<int32_t> force_gossip_generation;
|
||||
named_value<bool> experimental;
|
||||
named_value<std::vector<enum_option<experimental_features_t>>> experimental_features;
|
||||
named_value<size_t> lsa_reclamation_step;
|
||||
|
||||
@@ -405,11 +405,8 @@ future<> manager::end_point_hints_manager::sender::do_send_one_mutation(frozen_m
|
||||
return _proxy.send_to_endpoint(std::move(m), end_point_key(), { }, write_type::SIMPLE, service::allow_hints::no);
|
||||
} else {
|
||||
manager_logger.trace("Endpoints set has changed and {} is no longer a replica. Mutating from scratch...", end_point_key());
|
||||
// FIXME: using 1h as infinite timeout. If a node is down, we should get an
|
||||
// unavailable exception.
|
||||
auto timeout = db::timeout_clock::now() + 1h;
|
||||
//FIXME: Add required frozen_mutation overloads
|
||||
return _proxy.mutate({m.fm.unfreeze(m.s)}, consistency_level::ALL, timeout, nullptr, empty_service_permit());
|
||||
return _proxy.mutate_hint_from_scratch(std::move(m));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@@ -1748,7 +1748,7 @@ static void maybe_add_virtual_reader(schema_ptr s, database& db) {
|
||||
}
|
||||
|
||||
static bool maybe_write_in_user_memory(schema_ptr s, database& db) {
|
||||
return (s.get() == batchlog().get())
|
||||
return (s.get() == batchlog().get()) || (s.get() == paxos().get())
|
||||
|| s == v3::scylla_views_builds_in_progress();
|
||||
}
|
||||
|
||||
|
||||
@@ -307,7 +307,7 @@ deletable_row& view_updates::get_view_row(const partition_key& base_key, const c
|
||||
if (!cdef.is_computed()) {
|
||||
//FIXME(sarna): this legacy code is here for backward compatibility and should be removed
|
||||
// once "computed_columns feature" is supported by every node
|
||||
if (!service::get_local_storage_service().db().local().find_column_family(_base->id()).get_index_manager().is_index(*_base)) {
|
||||
if (!service::get_local_storage_service().db().local().find_column_family(_base->id()).get_index_manager().is_index(*_view)) {
|
||||
throw std::logic_error(format("Column {} doesn't exist in base and this view is not backing a secondary index", cdef.name_as_text()));
|
||||
}
|
||||
computed_value = token_column_computation().compute_value(*_base, base_key, update);
|
||||
@@ -879,7 +879,11 @@ future<stop_iteration> view_update_builder::on_results() {
|
||||
if (_update && !_update->is_end_of_partition()) {
|
||||
if (_update->is_clustering_row()) {
|
||||
apply_tracked_tombstones(_update_tombstone_tracker, _update->as_mutable_clustering_row());
|
||||
generate_update(std::move(*_update).as_clustering_row(), { });
|
||||
auto existing_tombstone = _existing_tombstone_tracker.current_tombstone();
|
||||
auto existing = existing_tombstone
|
||||
? std::optional<clustering_row>(std::in_place, _update->as_clustering_row().key(), row_tombstone(std::move(existing_tombstone)), row_marker(), ::row())
|
||||
: std::nullopt;
|
||||
generate_update(std::move(*_update).as_clustering_row(), std::move(existing));
|
||||
}
|
||||
return advance_updates();
|
||||
}
|
||||
|
||||
2
dist/debian/build_deb.sh
vendored
2
dist/debian/build_deb.sh
vendored
@@ -125,7 +125,7 @@ if [ -z "$TARGET" ]; then
|
||||
fi
|
||||
RELOC_PKG_FULLPATH=$(readlink -f $RELOC_PKG)
|
||||
RELOC_PKG_BASENAME=$(basename $RELOC_PKG)
|
||||
SCYLLA_VERSION=$(cat SCYLLA-VERSION-FILE)
|
||||
SCYLLA_VERSION=$(cat SCYLLA-VERSION-FILE | sed 's/\.rc/~rc/')
|
||||
SCYLLA_RELEASE=$(cat SCYLLA-RELEASE-FILE)
|
||||
|
||||
ln -fv $RELOC_PKG_FULLPATH ../$PRODUCT-server_$SCYLLA_VERSION-$SCYLLA_RELEASE.orig.tar.gz
|
||||
|
||||
1
dist/debian/debian/scylla-server.install
vendored
1
dist/debian/debian/scylla-server.install
vendored
@@ -4,7 +4,6 @@ etc/security/limits.d/scylla.conf
|
||||
etc/scylla.d/*.conf
|
||||
opt/scylladb/share/doc/scylla/*
|
||||
opt/scylladb/share/doc/scylla/licenses/
|
||||
usr/lib/systemd/system/*.service
|
||||
usr/lib/systemd/system/*.timer
|
||||
usr/lib/systemd/system/*.slice
|
||||
usr/bin/scylla
|
||||
|
||||
6
dist/debian/debian/scylla-server.postrm
vendored
6
dist/debian/debian/scylla-server.postrm
vendored
@@ -6,8 +6,12 @@ case "$1" in
|
||||
purge|remove)
|
||||
rm -rf /etc/systemd/system/scylla-housekeeping-daily.service.d/
|
||||
rm -rf /etc/systemd/system/scylla-housekeeping-restart.service.d/
|
||||
rm -rf /etc/systemd/system/scylla-server.service.d/
|
||||
rm -rf /etc/systemd/system/scylla-helper.slice.d/
|
||||
# We need to keep dependencies.conf and sysconfdir.conf on 'remove',
|
||||
# otherwise it will be missing after rollback.
|
||||
if [ "$1" = "purge" ]; then
|
||||
rm -rf /etc/systemd/system/scylla-server.service.d/
|
||||
fi
|
||||
;;
|
||||
esac
|
||||
|
||||
|
||||
4
dist/redhat/scylla.spec.mustache
vendored
4
dist/redhat/scylla.spec.mustache
vendored
@@ -17,6 +17,10 @@ Obsoletes: scylla-server < 1.1
|
||||
|
||||
%undefine _find_debuginfo_dwz_opts
|
||||
|
||||
# Prevent find-debuginfo.sh from tempering with scylla's build-id (#5881)
|
||||
%undefine _unique_build_ids
|
||||
%global _no_recompute_build_ids 1
|
||||
|
||||
%description
|
||||
Scylla is a highly scalable, eventually consistent, distributed,
|
||||
partitioned row DB.
|
||||
|
||||
@@ -76,6 +76,9 @@ Scylla with issue #4139 fixed)
|
||||
bit 4: CorrectEmptyCounters (if set, indicates the sstable was generated by
|
||||
Scylla with issue #4363 fixed)
|
||||
|
||||
bit 5: CorrectUDTsInCollections (if set, indicates that the sstable was generated
|
||||
by Scylla with issue #6130 fixed)
|
||||
|
||||
## extension_attributes subcomponent
|
||||
|
||||
extension_attributes = extension_attribute_count extension_attribute*
|
||||
|
||||
@@ -98,6 +98,13 @@ public:
|
||||
sstring get_message() const { return what(); }
|
||||
};
|
||||
|
||||
class server_exception : public cassandra_exception {
|
||||
public:
|
||||
server_exception(sstring msg) noexcept
|
||||
: exceptions::cassandra_exception{exceptions::exception_code::SERVER_ERROR, std::move(msg)}
|
||||
{ }
|
||||
};
|
||||
|
||||
class protocol_exception : public cassandra_exception {
|
||||
public:
|
||||
protocol_exception(sstring msg) noexcept
|
||||
|
||||
@@ -1622,11 +1622,15 @@ future<> gossiper::start_gossiping(int generation_nbr, std::map<application_stat
|
||||
// message on all cpus and forard them to cpu0 to process.
|
||||
return get_gossiper().invoke_on_all([do_bind] (gossiper& g) {
|
||||
g.init_messaging_service_handler(do_bind);
|
||||
}).then([this, generation_nbr, preload_local_states] {
|
||||
}).then([this, generation_nbr, preload_local_states] () mutable {
|
||||
build_seeds_list();
|
||||
/* initialize the heartbeat state for this localEndpoint */
|
||||
maybe_initialize_local_state(generation_nbr);
|
||||
if (_cfg.force_gossip_generation() > 0) {
|
||||
generation_nbr = _cfg.force_gossip_generation();
|
||||
logger.warn("Use the generation number provided by user: generation = {}", generation_nbr);
|
||||
}
|
||||
endpoint_state& local_state = endpoint_state_map[get_broadcast_address()];
|
||||
local_state.set_heart_beat_state_and_update_timestamp(heart_beat_state(generation_nbr));
|
||||
local_state.mark_alive();
|
||||
for (auto& entry : preload_local_states) {
|
||||
local_state.add_application_state(entry.first, entry.second);
|
||||
}
|
||||
@@ -1831,7 +1835,8 @@ future<> gossiper::do_stop_gossiping() {
|
||||
if (my_ep_state && !is_silent_shutdown_state(*my_ep_state)) {
|
||||
logger.info("Announcing shutdown");
|
||||
add_local_application_state(application_state::STATUS, _value_factory.shutdown(true)).get();
|
||||
for (inet_address addr : _live_endpoints) {
|
||||
auto live_endpoints = _live_endpoints;
|
||||
for (inet_address addr : live_endpoints) {
|
||||
msg_addr id = get_msg_addr(addr);
|
||||
logger.trace("Sending a GossipShutdown to {}", id);
|
||||
ms().send_gossip_shutdown(id, get_broadcast_address()).then_wrapped([id] (auto&&f) {
|
||||
|
||||
@@ -53,13 +53,13 @@ std::vector<inet_address> simple_strategy::calculate_natural_endpoints(const tok
|
||||
endpoints.reserve(replicas);
|
||||
|
||||
for (auto& token : tm.ring_range(t)) {
|
||||
if (endpoints.size() == replicas) {
|
||||
break;
|
||||
}
|
||||
auto ep = tm.get_endpoint(token);
|
||||
assert(ep);
|
||||
|
||||
endpoints.push_back(*ep);
|
||||
if (endpoints.size() == replicas) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return std::move(endpoints.get_vector());
|
||||
|
||||
10
main.cc
10
main.cc
@@ -54,6 +54,7 @@
|
||||
#include <seastar/core/file.hh>
|
||||
#include <sys/time.h>
|
||||
#include <sys/resource.h>
|
||||
#include <sys/prctl.h>
|
||||
#include "disk-error-handler.hh"
|
||||
#include "tracing/tracing.hh"
|
||||
#include "tracing/tracing_backend_registry.hh"
|
||||
@@ -464,6 +465,15 @@ inline auto defer_with_log_on_error(Func&& func) {
|
||||
}
|
||||
|
||||
int main(int ac, char** av) {
|
||||
// Allow core dumps. The would be disabled by default if
|
||||
// CAP_SYS_NICE was added to the binary, as is suggested by the
|
||||
// epoll backend.
|
||||
int r = prctl(PR_SET_DUMPABLE, 1, 0, 0, 0);
|
||||
if (r) {
|
||||
std::cerr << "Could not make scylla dumpable\n";
|
||||
exit(1);
|
||||
}
|
||||
|
||||
int return_value = 0;
|
||||
try {
|
||||
// early check to avoid triggering
|
||||
|
||||
@@ -39,6 +39,9 @@
|
||||
#include <seastar/core/execution_stage.hh>
|
||||
#include "types/map.hh"
|
||||
#include "compaction_garbage_collector.hh"
|
||||
#include "utils/exceptions.hh"
|
||||
|
||||
logging::logger mplog("mutation_partition");
|
||||
|
||||
template<bool reversed>
|
||||
struct reversal_traits;
|
||||
@@ -1236,7 +1239,9 @@ row::apply_monotonically(const column_definition& column, atomic_cell_or_collect
|
||||
void
|
||||
row::append_cell(column_id id, atomic_cell_or_collection value) {
|
||||
if (_type == storage_type::vector && id < max_vector_size) {
|
||||
assert(_storage.vector.v.size() <= id);
|
||||
if (_storage.vector.v.size() > id) {
|
||||
on_internal_error(mplog, format("Attempted to append cell#{} to row already having {} cells", id, _storage.vector.v.size()));
|
||||
}
|
||||
_storage.vector.v.resize(id);
|
||||
_storage.vector.v.emplace_back(cell_and_hash{std::move(value), cell_hash_opt()});
|
||||
_storage.vector.present.set(id);
|
||||
|
||||
@@ -177,6 +177,13 @@ future<> multishard_writer::distribute_mutation_fragments() {
|
||||
return handle_end_of_stream();
|
||||
}
|
||||
});
|
||||
}).handle_exception([this] (std::exception_ptr ep) {
|
||||
for (auto& q : _queue_reader_handles) {
|
||||
if (q) {
|
||||
q->abort(ep);
|
||||
}
|
||||
}
|
||||
return make_exception_future<>(std::move(ep));
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -444,10 +444,14 @@ class repair_writer {
|
||||
uint64_t _estimated_partitions;
|
||||
size_t _nr_peer_nodes;
|
||||
// Needs more than one for repair master
|
||||
std::vector<std::optional<future<uint64_t>>> _writer_done;
|
||||
std::vector<std::optional<future<>>> _writer_done;
|
||||
std::vector<std::optional<seastar::queue<mutation_fragment_opt>>> _mq;
|
||||
// Current partition written to disk
|
||||
std::vector<lw_shared_ptr<const decorated_key_with_hash>> _current_dk_written_to_sstable;
|
||||
// Is current partition still open. A partition is opened when a
|
||||
// partition_start is written and is closed when a partition_end is
|
||||
// written.
|
||||
std::vector<bool> _partition_opened;
|
||||
public:
|
||||
repair_writer(
|
||||
schema_ptr schema,
|
||||
@@ -462,10 +466,13 @@ public:
|
||||
future<> write_start_and_mf(lw_shared_ptr<const decorated_key_with_hash> dk, mutation_fragment mf, unsigned node_idx) {
|
||||
_current_dk_written_to_sstable[node_idx] = dk;
|
||||
if (mf.is_partition_start()) {
|
||||
return _mq[node_idx]->push_eventually(mutation_fragment_opt(std::move(mf)));
|
||||
return _mq[node_idx]->push_eventually(mutation_fragment_opt(std::move(mf))).then([this, node_idx] {
|
||||
_partition_opened[node_idx] = true;
|
||||
});
|
||||
} else {
|
||||
auto start = mutation_fragment(partition_start(dk->dk, tombstone()));
|
||||
return _mq[node_idx]->push_eventually(mutation_fragment_opt(std::move(start))).then([this, node_idx, mf = std::move(mf)] () mutable {
|
||||
_partition_opened[node_idx] = true;
|
||||
return _mq[node_idx]->push_eventually(mutation_fragment_opt(std::move(mf)));
|
||||
});
|
||||
}
|
||||
@@ -475,6 +482,7 @@ public:
|
||||
_writer_done.resize(_nr_peer_nodes);
|
||||
_mq.resize(_nr_peer_nodes);
|
||||
_current_dk_written_to_sstable.resize(_nr_peer_nodes);
|
||||
_partition_opened.resize(_nr_peer_nodes, false);
|
||||
}
|
||||
|
||||
void create_writer(unsigned node_idx) {
|
||||
@@ -516,7 +524,24 @@ public:
|
||||
return consumer(std::move(reader));
|
||||
});
|
||||
},
|
||||
t.stream_in_progress());
|
||||
t.stream_in_progress()).then([this, node_idx] (uint64_t partitions) {
|
||||
rlogger.debug("repair_writer: keyspace={}, table={}, managed to write partitions={} to sstable",
|
||||
_schema->ks_name(), _schema->cf_name(), partitions);
|
||||
}).handle_exception([this, node_idx] (std::exception_ptr ep) {
|
||||
rlogger.warn("repair_writer: keyspace={}, table={}, multishard_writer failed: {}",
|
||||
_schema->ks_name(), _schema->cf_name(), ep);
|
||||
_mq[node_idx]->abort(ep);
|
||||
return make_exception_future<>(std::move(ep));
|
||||
});
|
||||
}
|
||||
|
||||
future<> write_partition_end(unsigned node_idx) {
|
||||
if (_partition_opened[node_idx]) {
|
||||
return _mq[node_idx]->push_eventually(mutation_fragment(partition_end())).then([this, node_idx] {
|
||||
_partition_opened[node_idx] = false;
|
||||
});
|
||||
}
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
future<> do_write(unsigned node_idx, lw_shared_ptr<const decorated_key_with_hash> dk, mutation_fragment mf) {
|
||||
@@ -524,7 +549,7 @@ public:
|
||||
if (_current_dk_written_to_sstable[node_idx]->dk.equal(*_schema, dk->dk)) {
|
||||
return _mq[node_idx]->push_eventually(mutation_fragment_opt(std::move(mf)));
|
||||
} else {
|
||||
return _mq[node_idx]->push_eventually(mutation_fragment(partition_end())).then([this,
|
||||
return write_partition_end(node_idx).then([this,
|
||||
node_idx, dk = std::move(dk), mf = std::move(mf)] () mutable {
|
||||
return write_start_and_mf(std::move(dk), std::move(mf), node_idx);
|
||||
});
|
||||
@@ -534,21 +559,33 @@ public:
|
||||
}
|
||||
}
|
||||
|
||||
future<> write_end_of_stream(unsigned node_idx) {
|
||||
if (_mq[node_idx]) {
|
||||
// Partition_end is never sent on wire, so we have to write one ourselves.
|
||||
return write_partition_end(node_idx).then([this, node_idx] () mutable {
|
||||
// Empty mutation_fragment_opt means no more data, so the writer can seal the sstables.
|
||||
return _mq[node_idx]->push_eventually(mutation_fragment_opt());
|
||||
});
|
||||
} else {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
}
|
||||
|
||||
future<> do_wait_for_writer_done(unsigned node_idx) {
|
||||
if (_writer_done[node_idx]) {
|
||||
return std::move(*(_writer_done[node_idx]));
|
||||
} else {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
}
|
||||
|
||||
future<> wait_for_writer_done() {
|
||||
return parallel_for_each(boost::irange(unsigned(0), unsigned(_nr_peer_nodes)), [this] (unsigned node_idx) {
|
||||
if (_writer_done[node_idx] && _mq[node_idx]) {
|
||||
// Partition_end is never sent on wire, so we have to write one ourselves.
|
||||
return _mq[node_idx]->push_eventually(mutation_fragment(partition_end())).then([this, node_idx] () mutable {
|
||||
// Empty mutation_fragment_opt means no more data, so the writer can seal the sstables.
|
||||
return _mq[node_idx]->push_eventually(mutation_fragment_opt()).then([this, node_idx] () mutable {
|
||||
return (*_writer_done[node_idx]).then([] (uint64_t partitions) {
|
||||
rlogger.debug("Managed to write partitions={} to sstable", partitions);
|
||||
return make_ready_future<>();
|
||||
});
|
||||
});
|
||||
});
|
||||
}
|
||||
return make_ready_future<>();
|
||||
return when_all_succeed(write_end_of_stream(node_idx), do_wait_for_writer_done(node_idx));
|
||||
}).handle_exception([this] (std::exception_ptr ep) {
|
||||
rlogger.warn("repair_writer: keyspace={}, table={}, wait_for_writer_done failed: {}",
|
||||
_schema->ks_name(), _schema->cf_name(), ep);
|
||||
return make_exception_future<>(std::move(ep));
|
||||
});
|
||||
}
|
||||
};
|
||||
@@ -2174,7 +2211,7 @@ class row_level_repair {
|
||||
|
||||
// If the total size of the `_row_buf` on either of the nodes is zero,
|
||||
// we set this flag, which is an indication that rows are not synced.
|
||||
bool _zero_rows;
|
||||
bool _zero_rows = false;
|
||||
|
||||
// Sum of estimated_partitions on all peers
|
||||
uint64_t _estimated_partitions = 0;
|
||||
|
||||
@@ -288,10 +288,10 @@ schema::schema(const raw_schema& raw, std::optional<raw_view_info> raw_view_info
|
||||
+ column_offset(column_kind::regular_column),
|
||||
_raw._columns.end(), column_definition::name_comparator(regular_column_name_type()));
|
||||
|
||||
std::sort(_raw._columns.begin(),
|
||||
std::stable_sort(_raw._columns.begin(),
|
||||
_raw._columns.begin() + column_offset(column_kind::clustering_key),
|
||||
[] (auto x, auto y) { return x.id < y.id; });
|
||||
std::sort(_raw._columns.begin() + column_offset(column_kind::clustering_key),
|
||||
std::stable_sort(_raw._columns.begin() + column_offset(column_kind::clustering_key),
|
||||
_raw._columns.begin() + column_offset(column_kind::static_column),
|
||||
[] (auto x, auto y) { return x.id < y.id; });
|
||||
|
||||
|
||||
@@ -58,7 +58,8 @@ EOS
|
||||
# For systems with not a lot of memory, override default reservations for the slices
|
||||
# seastar has a minimum reservation of 1.5GB that kicks in, and 21GB * 0.07 = 1.5GB.
|
||||
# So for anything smaller than that we will not use percentages in the helper slice
|
||||
MEMTOTAL_BYTES=$(cat /proc/meminfo | grep MemTotal | awk '{print $2 * 1024}')
|
||||
MEMTOTAL=$(cat /proc/meminfo |grep -e "^MemTotal:"|sed -s 's/^MemTotal:\s*\([0-9]*\) kB$/\1/')
|
||||
MEMTOTAL_BYTES=$(($MEMTOTAL * 1024))
|
||||
if [ $MEMTOTAL_BYTES -lt 23008753371 ]; then
|
||||
mkdir -p /etc/systemd/system/scylla-helper.slice.d/
|
||||
cat << EOS > /etc/systemd/system/scylla-helper.slice.d/memory.conf
|
||||
|
||||
2
seastar
2
seastar
Submodule seastar updated: 8e236efda9...c8668e98bd
@@ -38,7 +38,12 @@ private:
|
||||
public:
|
||||
query_state(client_state& client_state, service_permit permit)
|
||||
: _client_state(client_state)
|
||||
, _trace_state_ptr(_client_state.get_trace_state())
|
||||
, _trace_state_ptr(tracing::trace_state_ptr())
|
||||
, _permit(std::move(permit))
|
||||
{ }
|
||||
query_state(client_state& client_state, tracing::trace_state_ptr trace_state, service_permit permit)
|
||||
: _client_state(client_state)
|
||||
, _trace_state_ptr(std::move(trace_state))
|
||||
, _permit(std::move(permit))
|
||||
{ }
|
||||
|
||||
|
||||
@@ -2183,6 +2183,14 @@ future<> storage_proxy::send_to_endpoint(
|
||||
allow_hints);
|
||||
}
|
||||
|
||||
future<> storage_proxy::mutate_hint_from_scratch(frozen_mutation_and_schema fm_a_s) {
|
||||
// FIXME: using 1h as infinite timeout. If a node is down, we should get an
|
||||
// unavailable exception.
|
||||
const auto timeout = db::timeout_clock::now() + 1h;
|
||||
std::array<mutation, 1> ms{fm_a_s.fm.unfreeze(fm_a_s.s)};
|
||||
return mutate_internal(std::move(ms), db::consistency_level::ALL, false, nullptr, empty_service_permit(), timeout);
|
||||
}
|
||||
|
||||
/**
|
||||
* Send the mutations to the right targets, write it locally if it corresponds or writes a hint when the node
|
||||
* is not available.
|
||||
@@ -3935,7 +3943,7 @@ storage_proxy::do_query_with_paxos(schema_ptr s,
|
||||
return make_ready_future<storage_proxy::coordinator_query_result>(f.get0());
|
||||
} catch (request_timeout_exception& ex) {
|
||||
_stats.cas_read_timeouts.mark();
|
||||
return make_exception_future<storage_proxy::coordinator_query_result>(std::move(ex));
|
||||
return make_exception_future<storage_proxy::coordinator_query_result>(std::current_exception());
|
||||
} catch (exceptions::unavailable_exception& ex) {
|
||||
_stats.cas_read_unavailables.mark();
|
||||
return make_exception_future<storage_proxy::coordinator_query_result>(std::move(ex));
|
||||
@@ -4062,7 +4070,7 @@ future<bool> storage_proxy::cas(schema_ptr schema, shared_ptr<cas_request> reque
|
||||
return make_ready_future<bool>(f.get0());
|
||||
} catch (request_timeout_exception& ex) {
|
||||
_stats.cas_write_timeouts.mark();
|
||||
return make_exception_future<bool>(std::move(ex));
|
||||
return make_exception_future<bool>(std::current_exception());
|
||||
} catch (exceptions::unavailable_exception& ex) {
|
||||
_stats.cas_write_unavailables.mark();
|
||||
return make_exception_future<bool>(std::move(ex));
|
||||
|
||||
@@ -459,6 +459,8 @@ public:
|
||||
*/
|
||||
future<> mutate_atomically(std::vector<mutation> mutations, db::consistency_level cl, clock_type::time_point timeout, tracing::trace_state_ptr tr_state, service_permit permit);
|
||||
|
||||
future<> mutate_hint_from_scratch(frozen_mutation_and_schema fm_a_s);
|
||||
|
||||
// Send a mutation to one specific remote target.
|
||||
// Inspired by Cassandra's StorageProxy.sendToHintedEndpoints but without
|
||||
// hinted handoff support, and just one target. See also
|
||||
|
||||
@@ -1440,7 +1440,8 @@ future<> storage_service::drain_on_shutdown() {
|
||||
ss._sys_dist_ks.invoke_on_all(&db::system_distributed_keyspace::stop).get();
|
||||
slogger.info("Drain on shutdown: system distributed keyspace stopped");
|
||||
|
||||
get_storage_proxy().invoke_on_all([&ss] (storage_proxy& local_proxy) mutable {
|
||||
get_storage_proxy().invoke_on_all([] (storage_proxy& local_proxy) mutable {
|
||||
auto& ss = service::get_local_storage_service();
|
||||
ss.unregister_subscriber(&local_proxy);
|
||||
return local_proxy.drain_on_shutdown();
|
||||
}).get();
|
||||
|
||||
@@ -72,47 +72,8 @@ private:
|
||||
static std::vector<column_info> build(
|
||||
const schema& s,
|
||||
const utils::chunked_vector<serialization_header::column_desc>& src,
|
||||
bool is_static) {
|
||||
std::vector<column_info> cols;
|
||||
if (s.is_dense()) {
|
||||
const column_definition& col = is_static ? *s.static_begin() : *s.regular_begin();
|
||||
cols.push_back(column_info{
|
||||
&col.name(),
|
||||
col.type,
|
||||
col.id,
|
||||
col.type->value_length_if_fixed(),
|
||||
col.is_multi_cell(),
|
||||
col.is_counter(),
|
||||
false
|
||||
});
|
||||
} else {
|
||||
cols.reserve(src.size());
|
||||
for (auto&& desc : src) {
|
||||
const bytes& type_name = desc.type_name.value;
|
||||
data_type type = db::marshal::type_parser::parse(to_sstring_view(type_name));
|
||||
const column_definition* def = s.get_column_definition(desc.name.value);
|
||||
std::optional<column_id> id;
|
||||
bool schema_mismatch = false;
|
||||
if (def) {
|
||||
id = def->id;
|
||||
schema_mismatch = def->is_multi_cell() != type->is_multi_cell() ||
|
||||
def->is_counter() != type->is_counter() ||
|
||||
!def->type->is_value_compatible_with(*type);
|
||||
}
|
||||
cols.push_back(column_info{
|
||||
&desc.name.value,
|
||||
type,
|
||||
id,
|
||||
type->value_length_if_fixed(),
|
||||
type->is_multi_cell(),
|
||||
type->is_counter(),
|
||||
schema_mismatch
|
||||
});
|
||||
}
|
||||
boost::range::stable_partition(cols, [](const column_info& column) { return !column.is_collection; });
|
||||
}
|
||||
return cols;
|
||||
}
|
||||
const sstable_enabled_features& features,
|
||||
bool is_static);
|
||||
|
||||
utils::UUID schema_uuid;
|
||||
std::vector<column_info> regular_schema_columns_from_sstable;
|
||||
@@ -125,10 +86,10 @@ private:
|
||||
state(state&&) = default;
|
||||
state& operator=(state&&) = default;
|
||||
|
||||
state(const schema& s, const serialization_header& header)
|
||||
state(const schema& s, const serialization_header& header, const sstable_enabled_features& features)
|
||||
: schema_uuid(s.version())
|
||||
, regular_schema_columns_from_sstable(build(s, header.regular_columns.elements, false))
|
||||
, static_schema_columns_from_sstable(build(s, header.static_columns.elements, true))
|
||||
, regular_schema_columns_from_sstable(build(s, header.regular_columns.elements, features, false))
|
||||
, static_schema_columns_from_sstable(build(s, header.static_columns.elements, features, true))
|
||||
, clustering_column_value_fix_lengths (get_clustering_values_fixed_lengths(header))
|
||||
{}
|
||||
};
|
||||
@@ -136,9 +97,10 @@ private:
|
||||
lw_shared_ptr<const state> _state = make_lw_shared<const state>();
|
||||
|
||||
public:
|
||||
column_translation get_for_schema(const schema& s, const serialization_header& header) {
|
||||
column_translation get_for_schema(
|
||||
const schema& s, const serialization_header& header, const sstable_enabled_features& features) {
|
||||
if (s.version() != _state->schema_uuid) {
|
||||
_state = make_lw_shared(state(s, header));
|
||||
_state = make_lw_shared(state(s, header, features));
|
||||
}
|
||||
return *this;
|
||||
}
|
||||
|
||||
@@ -38,6 +38,8 @@
|
||||
*/
|
||||
|
||||
#include "mp_row_consumer.hh"
|
||||
#include "column_translation.hh"
|
||||
#include "concrete_types.hh"
|
||||
|
||||
namespace sstables {
|
||||
|
||||
@@ -79,4 +81,86 @@ atomic_cell make_counter_cell(api::timestamp_type timestamp, bytes_view value) {
|
||||
return ccb.build(timestamp);
|
||||
}
|
||||
|
||||
// See #6130.
|
||||
static data_type freeze_types_in_collections(data_type t) {
|
||||
return ::visit(*t, make_visitor(
|
||||
[] (const map_type_impl& typ) -> data_type {
|
||||
return map_type_impl::get_instance(
|
||||
freeze_types_in_collections(typ.get_keys_type()->freeze()),
|
||||
freeze_types_in_collections(typ.get_values_type()->freeze()),
|
||||
typ.is_multi_cell());
|
||||
},
|
||||
[] (const set_type_impl& typ) -> data_type {
|
||||
return set_type_impl::get_instance(
|
||||
freeze_types_in_collections(typ.get_elements_type()->freeze()),
|
||||
typ.is_multi_cell());
|
||||
},
|
||||
[] (const list_type_impl& typ) -> data_type {
|
||||
return list_type_impl::get_instance(
|
||||
freeze_types_in_collections(typ.get_elements_type()->freeze()),
|
||||
typ.is_multi_cell());
|
||||
},
|
||||
[&] (const abstract_type& typ) -> data_type {
|
||||
return std::move(t);
|
||||
}
|
||||
));
|
||||
}
|
||||
|
||||
/* If this function returns false, the caller cannot assume that the SSTable comes from Scylla.
|
||||
* It might, if for some reason a table was created using Scylla that didn't contain any feature bit,
|
||||
* but that should never happen. */
|
||||
static bool is_certainly_scylla_sstable(const sstable_enabled_features& features) {
|
||||
return features.enabled_features;
|
||||
}
|
||||
|
||||
std::vector<column_translation::column_info> column_translation::state::build(
|
||||
const schema& s,
|
||||
const utils::chunked_vector<serialization_header::column_desc>& src,
|
||||
const sstable_enabled_features& features,
|
||||
bool is_static) {
|
||||
std::vector<column_info> cols;
|
||||
if (s.is_dense()) {
|
||||
const column_definition& col = is_static ? *s.static_begin() : *s.regular_begin();
|
||||
cols.push_back(column_info{
|
||||
&col.name(),
|
||||
col.type,
|
||||
col.id,
|
||||
col.type->value_length_if_fixed(),
|
||||
col.is_multi_cell(),
|
||||
col.is_counter(),
|
||||
false
|
||||
});
|
||||
} else {
|
||||
cols.reserve(src.size());
|
||||
for (auto&& desc : src) {
|
||||
const bytes& type_name = desc.type_name.value;
|
||||
data_type type = db::marshal::type_parser::parse(to_sstring_view(type_name));
|
||||
if (!features.is_enabled(CorrectUDTsInCollections) && is_certainly_scylla_sstable(features)) {
|
||||
// See #6130.
|
||||
type = freeze_types_in_collections(std::move(type));
|
||||
}
|
||||
const column_definition* def = s.get_column_definition(desc.name.value);
|
||||
std::optional<column_id> id;
|
||||
bool schema_mismatch = false;
|
||||
if (def) {
|
||||
id = def->id;
|
||||
schema_mismatch = def->is_multi_cell() != type->is_multi_cell() ||
|
||||
def->is_counter() != type->is_counter() ||
|
||||
!def->type->is_value_compatible_with(*type);
|
||||
}
|
||||
cols.push_back(column_info{
|
||||
&desc.name.value,
|
||||
type,
|
||||
id,
|
||||
type->value_length_if_fixed(),
|
||||
type->is_multi_cell(),
|
||||
type->is_counter(),
|
||||
schema_mismatch
|
||||
});
|
||||
}
|
||||
boost::range::stable_partition(cols, [](const column_info& column) { return !column.is_collection; });
|
||||
}
|
||||
return cols;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -1344,7 +1344,7 @@ public:
|
||||
, _consumer(consumer)
|
||||
, _sst(sst)
|
||||
, _header(sst->get_serialization_header())
|
||||
, _column_translation(sst->get_column_translation(s, _header))
|
||||
, _column_translation(sst->get_column_translation(s, _header, sst->features()))
|
||||
, _has_shadowable_tombstones(sst->has_shadowable_tombstones())
|
||||
{
|
||||
setup_columns(_regular_row, _column_translation.regular_columns());
|
||||
|
||||
@@ -780,8 +780,9 @@ public:
|
||||
const serialization_header& get_serialization_header() const {
|
||||
return get_mutable_serialization_header(*_components);
|
||||
}
|
||||
column_translation get_column_translation(const schema& s, const serialization_header& h) {
|
||||
return _column_translation.get_for_schema(s, h);
|
||||
column_translation get_column_translation(
|
||||
const schema& s, const serialization_header& h, const sstable_enabled_features& f) {
|
||||
return _column_translation.get_for_schema(s, h, f);
|
||||
}
|
||||
const std::vector<unsigned>& get_shards_for_this_sstable() const {
|
||||
return _shards;
|
||||
|
||||
@@ -459,7 +459,8 @@ enum sstable_feature : uint8_t {
|
||||
ShadowableTombstones = 2, // See #3885
|
||||
CorrectStaticCompact = 3, // See #4139
|
||||
CorrectEmptyCounters = 4, // See #4363
|
||||
End = 5,
|
||||
CorrectUDTsInCollections = 5, // See #6130
|
||||
End = 6,
|
||||
};
|
||||
|
||||
// Scylla-specific features enabled for a particular sstable.
|
||||
|
||||
@@ -77,3 +77,45 @@ BOOST_AUTO_TEST_CASE(test_make_random_uuid) {
|
||||
std::sort(uuids.begin(), uuids.end());
|
||||
BOOST_CHECK(std::unique(uuids.begin(), uuids.end()) == uuids.end());
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_CASE(test_get_time_uuid) {
|
||||
using namespace std::chrono;
|
||||
|
||||
auto uuid = utils::UUID_gen::get_time_UUID();
|
||||
BOOST_CHECK(uuid.is_timestamp());
|
||||
|
||||
auto tp = system_clock::now();
|
||||
uuid = utils::UUID_gen::get_time_UUID(tp);
|
||||
BOOST_CHECK(uuid.is_timestamp());
|
||||
|
||||
auto millis = duration_cast<milliseconds>(tp.time_since_epoch()).count();
|
||||
uuid = utils::UUID_gen::get_time_UUID(millis);
|
||||
BOOST_CHECK(uuid.is_timestamp());
|
||||
|
||||
auto unix_timestamp = utils::UUID_gen::unix_timestamp(uuid);
|
||||
BOOST_CHECK(unix_timestamp == millis);
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_CASE(test_min_time_uuid) {
|
||||
using namespace std::chrono;
|
||||
|
||||
auto tp = system_clock::now();
|
||||
auto millis = duration_cast<milliseconds>(tp.time_since_epoch()).count();
|
||||
auto uuid = utils::UUID_gen::min_time_UUID(millis);
|
||||
BOOST_CHECK(uuid.is_timestamp());
|
||||
|
||||
auto unix_timestamp = utils::UUID_gen::unix_timestamp(uuid);
|
||||
BOOST_CHECK(unix_timestamp == millis);
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_CASE(test_max_time_uuid) {
|
||||
using namespace std::chrono;
|
||||
|
||||
auto tp = system_clock::now();
|
||||
auto millis = duration_cast<milliseconds>(tp.time_since_epoch()).count();
|
||||
auto uuid = utils::UUID_gen::max_time_UUID(millis);
|
||||
BOOST_CHECK(uuid.is_timestamp());
|
||||
|
||||
auto unix_timestamp = utils::UUID_gen::unix_timestamp(uuid);
|
||||
BOOST_CHECK(unix_timestamp == millis);
|
||||
}
|
||||
|
||||
@@ -933,7 +933,6 @@ SEASTAR_TEST_CASE(test_parse_experimental_features_cdc) {
|
||||
BOOST_CHECK_EQUAL(cfg.experimental_features(), features{ef::CDC});
|
||||
BOOST_CHECK(cfg.check_experimental(ef::CDC));
|
||||
BOOST_CHECK(!cfg.check_experimental(ef::LWT));
|
||||
BOOST_CHECK(!cfg.check_experimental(ef::UDF));
|
||||
return make_ready_future();
|
||||
}
|
||||
|
||||
@@ -943,17 +942,6 @@ SEASTAR_TEST_CASE(test_parse_experimental_features_lwt) {
|
||||
BOOST_CHECK_EQUAL(cfg.experimental_features(), features{ef::LWT});
|
||||
BOOST_CHECK(!cfg.check_experimental(ef::CDC));
|
||||
BOOST_CHECK(cfg.check_experimental(ef::LWT));
|
||||
BOOST_CHECK(!cfg.check_experimental(ef::UDF));
|
||||
return make_ready_future();
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_parse_experimental_features_udf) {
|
||||
config cfg;
|
||||
cfg.read_from_yaml("experimental_features:\n - udf\n", throw_on_error);
|
||||
BOOST_CHECK_EQUAL(cfg.experimental_features(), features{ef::UDF});
|
||||
BOOST_CHECK(!cfg.check_experimental(ef::CDC));
|
||||
BOOST_CHECK(!cfg.check_experimental(ef::LWT));
|
||||
BOOST_CHECK(cfg.check_experimental(ef::UDF));
|
||||
return make_ready_future();
|
||||
}
|
||||
|
||||
@@ -963,7 +951,6 @@ SEASTAR_TEST_CASE(test_parse_experimental_features_multiple) {
|
||||
BOOST_CHECK_EQUAL(cfg.experimental_features(), (features{ef::CDC, ef::LWT, ef::CDC}));
|
||||
BOOST_CHECK(cfg.check_experimental(ef::CDC));
|
||||
BOOST_CHECK(cfg.check_experimental(ef::LWT));
|
||||
BOOST_CHECK(!cfg.check_experimental(ef::UDF));
|
||||
return make_ready_future();
|
||||
}
|
||||
|
||||
@@ -976,7 +963,6 @@ SEASTAR_TEST_CASE(test_parse_experimental_features_invalid) {
|
||||
BOOST_REQUIRE_NE(msg.find("line 2, column 7"), msg.npos);
|
||||
BOOST_CHECK(!cfg.check_experimental(ef::CDC));
|
||||
BOOST_CHECK(!cfg.check_experimental(ef::LWT));
|
||||
BOOST_CHECK(!cfg.check_experimental(ef::UDF));
|
||||
});
|
||||
return make_ready_future();
|
||||
}
|
||||
@@ -986,7 +972,6 @@ SEASTAR_TEST_CASE(test_parse_experimental_true) {
|
||||
cfg.read_from_yaml("experimental: true", throw_on_error);
|
||||
BOOST_CHECK(cfg.check_experimental(ef::CDC));
|
||||
BOOST_CHECK(cfg.check_experimental(ef::LWT));
|
||||
BOOST_CHECK(cfg.check_experimental(ef::UDF));
|
||||
return make_ready_future();
|
||||
}
|
||||
|
||||
@@ -995,6 +980,5 @@ SEASTAR_TEST_CASE(test_parse_experimental_false) {
|
||||
cfg.read_from_yaml("experimental: false", throw_on_error);
|
||||
BOOST_CHECK(!cfg.check_experimental(ef::CDC));
|
||||
BOOST_CHECK(!cfg.check_experimental(ef::LWT));
|
||||
BOOST_CHECK(!cfg.check_experimental(ef::UDF));
|
||||
return make_ready_future();
|
||||
}
|
||||
|
||||
@@ -4263,3 +4263,272 @@ SEASTAR_TEST_CASE(test_rf_expand) {
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
// Test that tombstones with future timestamps work correctly
|
||||
// when a write with lower timestamp arrives - in such case,
|
||||
// if the base row is covered by such a tombstone, a view update
|
||||
// needs to take it into account. Refs #5793
|
||||
SEASTAR_TEST_CASE(test_views_with_future_tombstones) {
|
||||
return do_with_cql_env_thread([] (auto& e) {
|
||||
cquery_nofail(e, "CREATE TABLE t (a int, b int, c int, d int, e int, PRIMARY KEY (a,b,c));");
|
||||
cquery_nofail(e, "CREATE MATERIALIZED VIEW tv AS SELECT * FROM t"
|
||||
" WHERE a IS NOT NULL AND b IS NOT NULL AND c IS NOT NULL PRIMARY KEY (b,a,c);");
|
||||
|
||||
// Partition tombstone
|
||||
cquery_nofail(e, "delete from t using timestamp 10 where a=1;");
|
||||
auto msg = cquery_nofail(e, "select * from t;");
|
||||
assert_that(msg).is_rows().with_size(0);
|
||||
cquery_nofail(e, "insert into t (a,b,c,d,e) values (1,2,3,4,5) using timestamp 8;");
|
||||
msg = cquery_nofail(e, "select * from t;");
|
||||
assert_that(msg).is_rows().with_size(0);
|
||||
msg = cquery_nofail(e, "select * from tv;");
|
||||
assert_that(msg).is_rows().with_size(0);
|
||||
|
||||
// Range tombstone
|
||||
cquery_nofail(e, "delete from t using timestamp 16 where a=2 and b > 1 and b < 4;");
|
||||
msg = cquery_nofail(e, "select * from t;");
|
||||
assert_that(msg).is_rows().with_size(0);
|
||||
cquery_nofail(e, "insert into t (a,b,c,d,e) values (2,3,4,5,6) using timestamp 12;");
|
||||
msg = cquery_nofail(e, "select * from t;");
|
||||
assert_that(msg).is_rows().with_size(0);
|
||||
msg = cquery_nofail(e, "select * from tv;");
|
||||
assert_that(msg).is_rows().with_size(0);
|
||||
|
||||
// Row tombstone
|
||||
cquery_nofail(e, "delete from t using timestamp 24 where a=3 and b=4 and c=5;");
|
||||
msg = cquery_nofail(e, "select * from t;");
|
||||
assert_that(msg).is_rows().with_size(0);
|
||||
cquery_nofail(e, "insert into t (a,b,c,d,e) values (3,4,5,6,7) using timestamp 18;");
|
||||
msg = cquery_nofail(e, "select * from t;");
|
||||
assert_that(msg).is_rows().with_size(0);
|
||||
msg = cquery_nofail(e, "select * from tv;");
|
||||
assert_that(msg).is_rows().with_size(0);
|
||||
});
|
||||
}
|
||||
|
||||
shared_ptr<cql_transport::messages::result_message> cql_func_require_nofail(
|
||||
cql_test_env& env,
|
||||
const seastar::sstring& fct,
|
||||
const seastar::sstring& inp,
|
||||
std::unique_ptr<cql3::query_options>&& qo = nullptr,
|
||||
const std::experimental::source_location& loc = std::experimental::source_location::current()) {
|
||||
auto res = shared_ptr<cql_transport::messages::result_message>(nullptr);
|
||||
auto query = format("SELECT {}({}) FROM t;", fct, inp);
|
||||
try {
|
||||
if (qo) {
|
||||
res = env.execute_cql(query, std::move(qo)).get0();
|
||||
} else {
|
||||
res = env.execute_cql(query).get0();
|
||||
}
|
||||
BOOST_TEST_MESSAGE(format("Query '{}' succeeded as expected", query));
|
||||
} catch (...) {
|
||||
BOOST_ERROR(format("query '{}' failed unexpectedly with error: {}\n{}:{}: originally from here",
|
||||
query, std::current_exception(),
|
||||
loc.file_name(), loc.line()));
|
||||
}
|
||||
return res;
|
||||
}
|
||||
|
||||
// FIXME: should be in cql_assertions, but we don't want to call boost from cql_assertions.hh
|
||||
template <typename Exception>
|
||||
void cql_func_require_throw(
|
||||
cql_test_env& env,
|
||||
const seastar::sstring& fct,
|
||||
const seastar::sstring& inp,
|
||||
std::unique_ptr<cql3::query_options>&& qo = nullptr,
|
||||
const std::experimental::source_location& loc = std::experimental::source_location::current()) {
|
||||
auto query = format("SELECT {}({}) FROM t;", fct, inp);
|
||||
try {
|
||||
if (qo) {
|
||||
env.execute_cql(query, std::move(qo)).get();
|
||||
} else {
|
||||
env.execute_cql(query).get();
|
||||
}
|
||||
BOOST_ERROR(format("query '{}' succeeded unexpectedly\n{}:{}: originally from here", query,
|
||||
loc.file_name(), loc.line()));
|
||||
} catch (Exception& e) {
|
||||
BOOST_TEST_MESSAGE(format("Query '{}' failed as expected with error: {}", query, e));
|
||||
} catch (...) {
|
||||
BOOST_ERROR(format("query '{}' failed with unexpected error: {}\n{}:{}: originally from here",
|
||||
query, std::current_exception(),
|
||||
loc.file_name(), loc.line()));
|
||||
}
|
||||
}
|
||||
|
||||
static void create_time_uuid_fcts_schema(cql_test_env& e) {
|
||||
cquery_nofail(e, "CREATE TABLE t (id int primary key, t timestamp, l bigint, f float, u timeuuid, d date)");
|
||||
cquery_nofail(e, "INSERT INTO t (id, t, l, f, u, d) VALUES "
|
||||
"(1, 1579072460606, 1579072460606000, 1579072460606, a66525e0-3766-11ea-8080-808080808080, '2020-01-13')");
|
||||
cquery_nofail(e, "SELECT * FROM t;");
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_basic_time_uuid_fcts) {
|
||||
return do_with_cql_env_thread([] (auto& e) {
|
||||
create_time_uuid_fcts_schema(e);
|
||||
|
||||
cql_func_require_nofail(e, "currenttime", "");
|
||||
cql_func_require_nofail(e, "currentdate", "");
|
||||
cql_func_require_nofail(e, "now", "");
|
||||
cql_func_require_nofail(e, "currenttimeuuid", "");
|
||||
cql_func_require_nofail(e, "currenttimestamp", "");
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_time_uuid_fcts_input_validation) {
|
||||
return do_with_cql_env_thread([] (auto& e) {
|
||||
create_time_uuid_fcts_schema(e);
|
||||
|
||||
// test timestamp arg
|
||||
auto require_timestamp = [&e] (const sstring& fct) {
|
||||
cql_func_require_nofail(e, fct, "t");
|
||||
cql_func_require_throw<exceptions::server_exception>(e, fct, "l");
|
||||
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "f");
|
||||
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "u");
|
||||
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "d");
|
||||
|
||||
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "currenttime()");
|
||||
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "currentdate()");
|
||||
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "now()");
|
||||
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "currenttimeuuid()");
|
||||
cql_func_require_nofail(e, fct, "currenttimestamp()");
|
||||
};
|
||||
|
||||
require_timestamp("mintimeuuid");
|
||||
require_timestamp("maxtimeuuid");
|
||||
|
||||
// test timeuuid arg
|
||||
auto require_timeuuid = [&e] (const sstring& fct) {
|
||||
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "t");
|
||||
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "l");
|
||||
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "f");
|
||||
cql_func_require_nofail(e, fct, "u");
|
||||
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "d");
|
||||
|
||||
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "currenttime()");
|
||||
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "currentdate()");
|
||||
cql_func_require_nofail(e, fct, "now()");
|
||||
cql_func_require_nofail(e, fct, "currenttimeuuid()");
|
||||
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "currenttimestamp()");
|
||||
};
|
||||
|
||||
require_timeuuid("dateof");
|
||||
require_timeuuid("unixtimestampof");
|
||||
|
||||
// test timeuuid or date arg
|
||||
auto require_timeuuid_or_date = [&e] (const sstring& fct) {
|
||||
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "t");
|
||||
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "l");
|
||||
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "f");
|
||||
cql_func_require_nofail(e, fct, "u");
|
||||
cql_func_require_nofail(e, fct, "d");
|
||||
|
||||
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "currenttime()");
|
||||
cql_func_require_nofail(e, fct, "currentdate()");
|
||||
cql_func_require_nofail(e, fct, "now()");
|
||||
cql_func_require_nofail(e, fct, "currenttimeuuid()");
|
||||
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "currenttimestamp()");
|
||||
};
|
||||
|
||||
require_timeuuid_or_date("totimestamp");
|
||||
|
||||
// test timestamp or timeuuid arg
|
||||
auto require_timestamp_or_timeuuid = [&e] (const sstring& fct) {
|
||||
cql_func_require_nofail(e, fct, "t");
|
||||
cql_func_require_throw<std::exception>(e, fct, "l");
|
||||
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "f");
|
||||
cql_func_require_nofail(e, fct, "u");
|
||||
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "d");
|
||||
|
||||
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "currenttime()");
|
||||
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "currentdate()");
|
||||
cql_func_require_nofail(e, fct, "now()");
|
||||
cql_func_require_nofail(e, fct, "currenttimeuuid()");
|
||||
cql_func_require_nofail(e, fct, "currenttimestamp()");
|
||||
};
|
||||
|
||||
require_timestamp_or_timeuuid("todate");
|
||||
|
||||
// test timestamp, timeuuid, or date arg
|
||||
auto require_timestamp_timeuuid_or_date = [&e] (const sstring& fct) {
|
||||
cql_func_require_nofail(e, fct, "t");
|
||||
cql_func_require_throw<exceptions::server_exception>(e, fct, "l");
|
||||
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "f");
|
||||
cql_func_require_nofail(e, fct, "u");
|
||||
cql_func_require_nofail(e, fct, "d");
|
||||
|
||||
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "currenttime()");
|
||||
cql_func_require_nofail(e, fct, "currentdate()");
|
||||
cql_func_require_nofail(e, fct, "now()");
|
||||
cql_func_require_nofail(e, fct, "currenttimeuuid()");
|
||||
cql_func_require_nofail(e, fct, "currenttimestamp()");
|
||||
};
|
||||
|
||||
require_timestamp_timeuuid_or_date("tounixtimestamp");
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_time_uuid_fcts_result) {
|
||||
return do_with_cql_env_thread([] (auto& e) {
|
||||
create_time_uuid_fcts_schema(e);
|
||||
|
||||
// test timestamp arg
|
||||
auto require_timestamp = [&e] (const sstring& fct) {
|
||||
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "mintimeuuid(t)");
|
||||
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "maxtimeuuid(t)");
|
||||
cql_func_require_nofail(e, fct, "dateof(u)");
|
||||
cql_func_require_nofail(e, fct, "unixtimestampof(u)");
|
||||
cql_func_require_nofail(e, fct, "totimestamp(u)");
|
||||
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "todate(u)");
|
||||
cql_func_require_nofail(e, fct, "tounixtimestamp(u)");
|
||||
};
|
||||
|
||||
require_timestamp("mintimeuuid");
|
||||
require_timestamp("maxtimeuuid");
|
||||
|
||||
// test timeuuid arg
|
||||
auto require_timeuuid = [&e] (const sstring& fct) {
|
||||
cql_func_require_nofail(e, fct, "mintimeuuid(t)");
|
||||
cql_func_require_nofail(e, fct, "maxtimeuuid(t)");
|
||||
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "dateof(u)");
|
||||
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "unixtimestampof(u)");
|
||||
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "totimestamp(u)");
|
||||
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "todate(u)");
|
||||
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "tounixtimestamp(u)");
|
||||
};
|
||||
|
||||
require_timeuuid("dateof");
|
||||
require_timeuuid("unixtimestampof");
|
||||
|
||||
// test timeuuid or date arg
|
||||
auto require_timeuuid_or_date = [&e] (const sstring& fct) {
|
||||
cql_func_require_nofail(e, fct, "mintimeuuid(t)");
|
||||
cql_func_require_nofail(e, fct, "maxtimeuuid(t)");
|
||||
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "dateof(u)");
|
||||
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "unixtimestampof(u)");
|
||||
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "totimestamp(u)");
|
||||
cql_func_require_nofail(e, fct, "todate(u)");
|
||||
cql_func_require_throw<exceptions::invalid_request_exception>(e, fct, "tounixtimestamp(u)");
|
||||
};
|
||||
|
||||
require_timeuuid_or_date("totimestamp");
|
||||
|
||||
// test timestamp or timeuuid arg
|
||||
auto require_timestamp_or_timeuuid = [&e] (const sstring& fct) {
|
||||
};
|
||||
|
||||
require_timestamp_or_timeuuid("todate");
|
||||
|
||||
// test timestamp, timeuuid, or date arg
|
||||
auto require_timestamp_timeuuid_or_date = [&e] (const sstring& fct) {
|
||||
cql_func_require_nofail(e, fct, "mintimeuuid(t)");
|
||||
cql_func_require_nofail(e, fct, "maxtimeuuid(t)");
|
||||
cql_func_require_nofail(e, fct, "dateof(u)");
|
||||
cql_func_require_nofail(e, fct, "unixtimestampof(u)");
|
||||
cql_func_require_nofail(e, fct, "totimestamp(u)");
|
||||
cql_func_require_nofail(e, fct, "todate(u)");
|
||||
cql_func_require_nofail(e, fct, "tounixtimestamp(u)");
|
||||
};
|
||||
|
||||
require_timestamp_timeuuid_or_date("tounixtimestamp");
|
||||
});
|
||||
}
|
||||
|
||||
@@ -25,7 +25,7 @@
|
||||
#include <seastar/util/noncopyable_function.hh>
|
||||
|
||||
inline
|
||||
void eventually(noncopyable_function<void ()> f, size_t max_attempts = 12) {
|
||||
void eventually(noncopyable_function<void ()> f, size_t max_attempts = 17) {
|
||||
size_t attempts = 0;
|
||||
while (true) {
|
||||
try {
|
||||
@@ -43,7 +43,7 @@ void eventually(noncopyable_function<void ()> f, size_t max_attempts = 12) {
|
||||
|
||||
inline
|
||||
bool eventually_true(noncopyable_function<bool ()> f) {
|
||||
const unsigned max_attempts = 10;
|
||||
const unsigned max_attempts = 15;
|
||||
unsigned attempts = 0;
|
||||
while (true) {
|
||||
if (f()) {
|
||||
|
||||
@@ -118,6 +118,53 @@ SEASTAR_TEST_CASE(test_multishard_writer) {
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_multishard_writer_producer_aborts) {
|
||||
return do_with_cql_env_thread([] (cql_test_env& e) {
|
||||
auto test_random_streams = [] (random_mutation_generator&& gen, size_t partition_nr, generate_error error = generate_error::no) {
|
||||
auto muts = gen(partition_nr);
|
||||
schema_ptr s = gen.schema();
|
||||
auto source_reader = partition_nr > 0 ? flat_mutation_reader_from_mutations(muts) : make_empty_flat_reader(s);
|
||||
int mf_produced = 0;
|
||||
auto get_next_mutation_fragment = [&source_reader, &mf_produced] () mutable {
|
||||
if (mf_produced++ > 800) {
|
||||
return make_exception_future<mutation_fragment_opt>(std::runtime_error("the producer failed"));
|
||||
} else {
|
||||
return source_reader(db::no_timeout);
|
||||
}
|
||||
};
|
||||
auto& partitioner = dht::global_partitioner();
|
||||
try {
|
||||
distribute_reader_and_consume_on_shards(s, partitioner,
|
||||
make_generating_reader(s, std::move(get_next_mutation_fragment)),
|
||||
[&partitioner, error] (flat_mutation_reader reader) mutable {
|
||||
if (error) {
|
||||
return make_exception_future<>(std::runtime_error("Failed to write"));
|
||||
}
|
||||
return repeat([&partitioner, reader = std::move(reader), error] () mutable {
|
||||
return reader(db::no_timeout).then([&partitioner, error] (mutation_fragment_opt mf_opt) mutable {
|
||||
if (mf_opt) {
|
||||
if (mf_opt->is_partition_start()) {
|
||||
auto shard = partitioner.shard_of(mf_opt->as_partition_start().key().token());
|
||||
BOOST_REQUIRE_EQUAL(shard, this_shard_id());
|
||||
}
|
||||
return make_ready_future<stop_iteration>(stop_iteration::no);
|
||||
} else {
|
||||
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
).get0();
|
||||
} catch (...) {
|
||||
// The distribute_reader_and_consume_on_shards is expected to fail and not block forever
|
||||
}
|
||||
};
|
||||
|
||||
test_random_streams(random_mutation_generator(random_mutation_generator::generate_counters::no, local_shard_only::yes), 1000, generate_error::no);
|
||||
test_random_streams(random_mutation_generator(random_mutation_generator::generate_counters::no, local_shard_only::yes), 1000, generate_error::yes);
|
||||
});
|
||||
}
|
||||
|
||||
namespace {
|
||||
|
||||
class bucket_writer {
|
||||
|
||||
@@ -586,7 +586,6 @@ future<> test_schema_digest_does_not_change_with_disabled_features(sstring data_
|
||||
|
||||
auto db_cfg_ptr = make_shared<db::config>();
|
||||
auto& db_cfg = *db_cfg_ptr;
|
||||
db_cfg.experimental_features({experimental_features_t::UDF}, db::config::config_source::CommandLine);
|
||||
if (regenerate) {
|
||||
db_cfg.data_file_directories({data_dir}, db::config::config_source::CommandLine);
|
||||
} else {
|
||||
|
||||
@@ -5262,3 +5262,131 @@ SEASTAR_THREAD_TEST_CASE(test_sstable_log_too_many_rows) {
|
||||
test_sstable_log_too_many_rows_f(random, (random + 1), false);
|
||||
test_sstable_log_too_many_rows_f((random + 1), random, true);
|
||||
}
|
||||
|
||||
// The following test runs on tests/sstables/3.x/uncompressed/legacy_udt_in_collection
|
||||
// It was created using Scylla 3.0.x using the following CQL statements:
|
||||
//
|
||||
// CREATE KEYSPACE ks WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};
|
||||
// CREATE TYPE ks.ut (a int, b int);
|
||||
// CREATE TABLE ks.t ( pk int PRIMARY KEY,
|
||||
// m map<int, frozen<ut>>,
|
||||
// fm frozen<map<int, frozen<ut>>>,
|
||||
// mm map<int, frozen<map<int, frozen<ut>>>>,
|
||||
// fmm frozen<map<int, frozen<map<int, frozen<ut>>>>>,
|
||||
// s set<frozen<ut>>,
|
||||
// fs frozen<set<frozen<ut>>>,
|
||||
// l list<frozen<ut>>,
|
||||
// fl frozen<list<frozen<ut>>>
|
||||
// ) WITH compression = {};
|
||||
// UPDATE ks.t USING TIMESTAMP 1525385507816568 SET
|
||||
// m[0] = {a: 0, b: 0},
|
||||
// fm = {0: {a: 0, b: 0}},
|
||||
// mm[0] = {0: {a: 0, b: 0}},
|
||||
// fmm = {0: {0: {a: 0, b: 0}}},
|
||||
// s = s + {{a: 0, b: 0}},
|
||||
// fs = {{a: 0, b: 0}},
|
||||
// l[scylla_timeuuid_list_index(7fb27e80-7b12-11ea-9fad-f4d108a9e4a3)] = {a: 0, b: 0},
|
||||
// fl = [{a: 0, b: 0}]
|
||||
// WHERE pk = 0;
|
||||
//
|
||||
// It checks whether a SSTable containing UDTs nested in collections, which contains incorrect serialization headers
|
||||
// (doesn't wrap nested UDTs in the FrozenType<...> tag) can be loaded by new versions of Scylla.
|
||||
|
||||
static const sstring LEGACY_UDT_IN_COLLECTION_PATH =
|
||||
"tests/sstables/3.x/uncompressed/legacy_udt_in_collection";
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_legacy_udt_in_collection_table) {
|
||||
auto abj = defer([] { await_background_jobs().get(); });
|
||||
|
||||
auto ut = user_type_impl::get_instance("ks", to_bytes("ut"),
|
||||
{to_bytes("a"), to_bytes("b")},
|
||||
{int32_type, int32_type}, false);
|
||||
auto m_type = map_type_impl::get_instance(int32_type, ut, true);
|
||||
auto fm_type = map_type_impl::get_instance(int32_type, ut, false);
|
||||
auto mm_type = map_type_impl::get_instance(int32_type, fm_type, true);
|
||||
auto fmm_type = map_type_impl::get_instance(int32_type, fm_type, false);
|
||||
auto s_type = set_type_impl::get_instance(ut, true);
|
||||
auto fs_type = set_type_impl::get_instance(ut, false);
|
||||
auto l_type = list_type_impl::get_instance(ut, true);
|
||||
auto fl_type = list_type_impl::get_instance(ut, false);
|
||||
|
||||
auto s = schema_builder("ks", "t")
|
||||
.with_column("pk", int32_type, column_kind::partition_key)
|
||||
.with_column("m", m_type)
|
||||
.with_column("fm", fm_type)
|
||||
.with_column("mm", mm_type)
|
||||
.with_column("fmm", fmm_type)
|
||||
.with_column("s", s_type)
|
||||
.with_column("fs", fs_type)
|
||||
.with_column("l", l_type)
|
||||
.with_column("fl", fl_type)
|
||||
.set_compressor_params(compression_parameters::no_compression())
|
||||
.build();
|
||||
|
||||
auto m_cdef = s->get_column_definition(to_bytes("m"));
|
||||
auto fm_cdef = s->get_column_definition(to_bytes("fm"));
|
||||
auto mm_cdef = s->get_column_definition(to_bytes("mm"));
|
||||
auto fmm_cdef = s->get_column_definition(to_bytes("fmm"));
|
||||
auto s_cdef = s->get_column_definition(to_bytes("s"));
|
||||
auto fs_cdef = s->get_column_definition(to_bytes("fs"));
|
||||
auto l_cdef = s->get_column_definition(to_bytes("l"));
|
||||
auto fl_cdef = s->get_column_definition(to_bytes("fl"));
|
||||
BOOST_REQUIRE(m_cdef && fm_cdef && mm_cdef && fmm_cdef && s_cdef && fs_cdef && l_cdef && fl_cdef);
|
||||
|
||||
auto ut_val = make_user_value(ut, {int32_t(0), int32_t(0)});
|
||||
auto fm_val = make_map_value(fm_type, {{int32_t(0), ut_val}});
|
||||
auto fmm_val = make_map_value(fmm_type, {{int32_t(0), fm_val}});
|
||||
auto fs_val = make_set_value(fs_type, {ut_val});
|
||||
auto fl_val = make_list_value(fl_type, {ut_val});
|
||||
|
||||
mutation mut{s, partition_key::from_deeply_exploded(*s, {0})};
|
||||
auto ckey = clustering_key::make_empty();
|
||||
|
||||
// m[0] = {a: 0, b: 0}
|
||||
{
|
||||
collection_mutation_description desc;
|
||||
desc.cells.emplace_back(int32_type->decompose(0),
|
||||
atomic_cell::make_live(*ut, write_timestamp, ut->decompose(ut_val), atomic_cell::collection_member::yes));
|
||||
mut.set_clustered_cell(ckey, *m_cdef, desc.serialize(*m_type));
|
||||
}
|
||||
|
||||
// fm = {0: {a: 0, b: 0}}
|
||||
mut.set_clustered_cell(ckey, *fm_cdef, atomic_cell::make_live(*fm_type, write_timestamp, fm_type->decompose(fm_val)));
|
||||
|
||||
// mm[0] = {0: {a: 0, b: 0}},
|
||||
{
|
||||
collection_mutation_description desc;
|
||||
desc.cells.emplace_back(int32_type->decompose(0),
|
||||
atomic_cell::make_live(*fm_type, write_timestamp, fm_type->decompose(fm_val), atomic_cell::collection_member::yes));
|
||||
mut.set_clustered_cell(ckey, *mm_cdef, desc.serialize(*mm_type));
|
||||
}
|
||||
|
||||
// fmm = {0: {0: {a: 0, b: 0}}},
|
||||
mut.set_clustered_cell(ckey, *fmm_cdef, atomic_cell::make_live(*fmm_type, write_timestamp, fmm_type->decompose(fmm_val)));
|
||||
|
||||
// s = s + {{a: 0, b: 0}},
|
||||
{
|
||||
collection_mutation_description desc;
|
||||
desc.cells.emplace_back(ut->decompose(ut_val),
|
||||
atomic_cell::make_live(*bytes_type, write_timestamp, bytes{}, atomic_cell::collection_member::yes));
|
||||
mut.set_clustered_cell(ckey, *s_cdef, desc.serialize(*s_type));
|
||||
}
|
||||
|
||||
// fs = {{a: 0, b: 0}},
|
||||
mut.set_clustered_cell(ckey, *fs_cdef, atomic_cell::make_live(*fs_type, write_timestamp, fs_type->decompose(fs_val)));
|
||||
|
||||
// l[scylla_timeuuid_list_index(7fb27e80-7b12-11ea-9fad-f4d108a9e4a3)] = {a: 0, b: 0},
|
||||
{
|
||||
collection_mutation_description desc;
|
||||
desc.cells.emplace_back(timeuuid_type->decompose(utils::UUID("7fb27e80-7b12-11ea-9fad-f4d108a9e4a3")),
|
||||
atomic_cell::make_live(*ut, write_timestamp, ut->decompose(ut_val), atomic_cell::collection_member::yes));
|
||||
mut.set_clustered_cell(ckey, *l_cdef, desc.serialize(*l_type));
|
||||
}
|
||||
|
||||
// fl = [{a: 0, b: 0}]
|
||||
mut.set_clustered_cell(ckey, *fl_cdef, atomic_cell::make_live(*fl_type, write_timestamp, fl_type->decompose(fl_val)));
|
||||
|
||||
sstable_assertions sst(s, LEGACY_UDT_IN_COLLECTION_PATH);
|
||||
sst.load();
|
||||
assert_that(sst.read_rows_flat()).produces(mut).produces_end_of_stream();
|
||||
}
|
||||
|
||||
Binary file not shown.
Binary file not shown.
@@ -0,0 +1 @@
|
||||
3519784297
|
||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
@@ -0,0 +1,9 @@
|
||||
Scylla.db
|
||||
CRC.db
|
||||
Filter.db
|
||||
Statistics.db
|
||||
TOC.txt
|
||||
Digest.crc32
|
||||
Index.db
|
||||
Summary.db
|
||||
Data.db
|
||||
@@ -402,6 +402,8 @@ BOOST_AUTO_TEST_CASE(test_varint) {
|
||||
BOOST_CHECK_EQUAL(value_cast<boost::multiprecision::cpp_int>(varint_type->deserialize(from_hex("00deadbeef"))), boost::multiprecision::cpp_int("0xdeadbeef"));
|
||||
BOOST_CHECK_EQUAL(value_cast<boost::multiprecision::cpp_int>(varint_type->deserialize(from_hex("00ffffffffffffffffffffffffffffffff"))), boost::multiprecision::cpp_int("340282366920938463463374607431768211455"));
|
||||
|
||||
BOOST_REQUIRE_EQUAL(from_hex("80000000"), varint_type->decompose(boost::multiprecision::cpp_int(-2147483648)));
|
||||
|
||||
test_parsing_fails(varint_type, "1A");
|
||||
}
|
||||
|
||||
|
||||
@@ -108,6 +108,7 @@ MAVEN_LOCAL_REPO="$HOME/.m2"
|
||||
mkdir -p "$MAVEN_LOCAL_REPO"
|
||||
|
||||
docker_common_args=(
|
||||
--pids-limit -1 \
|
||||
--network host \
|
||||
-u "$(id -u):$(id -g)" \
|
||||
"${group_args[@]}" \
|
||||
|
||||
@@ -347,6 +347,7 @@ future<std::unique_ptr<cql_server::response>>
|
||||
|
||||
trace_props.set_if<tracing::trace_state_props::log_slow_query>(tracing::tracing::get_local_tracing_instance().slow_query_tracing_enabled());
|
||||
trace_props.set_if<tracing::trace_state_props::full_tracing>(tracing_request != tracing_request_type::not_requested);
|
||||
tracing::trace_state_ptr trace_state;
|
||||
|
||||
if (trace_props) {
|
||||
if (cqlop == cql_binary_opcode::QUERY ||
|
||||
@@ -354,15 +355,15 @@ future<std::unique_ptr<cql_server::response>>
|
||||
cqlop == cql_binary_opcode::EXECUTE ||
|
||||
cqlop == cql_binary_opcode::BATCH) {
|
||||
trace_props.set_if<tracing::trace_state_props::write_on_close>(tracing_request == tracing_request_type::write_on_close);
|
||||
client_state.create_tracing_session(tracing::trace_type::QUERY, trace_props);
|
||||
trace_state = tracing::tracing::get_local_tracing_instance().create_session(tracing::trace_type::QUERY, trace_props);
|
||||
}
|
||||
}
|
||||
|
||||
tracing::set_request_size(client_state.get_trace_state(), fbuf.bytes_left());
|
||||
tracing::set_request_size(trace_state, fbuf.bytes_left());
|
||||
|
||||
auto linearization_buffer = std::make_unique<bytes_ostream>();
|
||||
auto linearization_buffer_ptr = linearization_buffer.get();
|
||||
return futurize_apply([this, cqlop, stream, &fbuf, &client_state, linearization_buffer_ptr, permit = std::move(permit)] () mutable {
|
||||
return futurize_apply([this, cqlop, stream, &fbuf, &client_state, linearization_buffer_ptr, permit = std::move(permit), trace_state] () mutable {
|
||||
// When using authentication, we need to ensure we are doing proper state transitions,
|
||||
// i.e. we cannot simply accept any query/exec ops unless auth is complete
|
||||
switch (client_state.get_auth_state()) {
|
||||
@@ -393,23 +394,23 @@ future<std::unique_ptr<cql_server::response>>
|
||||
return *user;
|
||||
}();
|
||||
|
||||
tracing::set_username(client_state.get_trace_state(), user);
|
||||
tracing::set_username(trace_state, user);
|
||||
|
||||
auto in = request_reader(std::move(fbuf), *linearization_buffer_ptr);
|
||||
switch (cqlop) {
|
||||
case cql_binary_opcode::STARTUP: return process_startup(stream, std::move(in), client_state);
|
||||
case cql_binary_opcode::AUTH_RESPONSE: return process_auth_response(stream, std::move(in), client_state);
|
||||
case cql_binary_opcode::OPTIONS: return process_options(stream, std::move(in), client_state);
|
||||
case cql_binary_opcode::QUERY: return process_query(stream, std::move(in), client_state, std::move(permit));
|
||||
case cql_binary_opcode::PREPARE: return process_prepare(stream, std::move(in), client_state);
|
||||
case cql_binary_opcode::EXECUTE: return process_execute(stream, std::move(in), client_state, std::move(permit));
|
||||
case cql_binary_opcode::BATCH: return process_batch(stream, std::move(in), client_state, std::move(permit));
|
||||
case cql_binary_opcode::REGISTER: return process_register(stream, std::move(in), client_state);
|
||||
case cql_binary_opcode::STARTUP: return process_startup(stream, std::move(in), client_state, trace_state);
|
||||
case cql_binary_opcode::AUTH_RESPONSE: return process_auth_response(stream, std::move(in), client_state, trace_state);
|
||||
case cql_binary_opcode::OPTIONS: return process_options(stream, std::move(in), client_state, trace_state);
|
||||
case cql_binary_opcode::QUERY: return process_query(stream, std::move(in), client_state, std::move(permit), trace_state);
|
||||
case cql_binary_opcode::PREPARE: return process_prepare(stream, std::move(in), client_state, trace_state);
|
||||
case cql_binary_opcode::EXECUTE: return process_execute(stream, std::move(in), client_state, std::move(permit), trace_state);
|
||||
case cql_binary_opcode::BATCH: return process_batch(stream, std::move(in), client_state, std::move(permit), trace_state);
|
||||
case cql_binary_opcode::REGISTER: return process_register(stream, std::move(in), client_state, trace_state);
|
||||
default: throw exceptions::protocol_exception(format("Unknown opcode {:d}", int(cqlop)));
|
||||
}
|
||||
}).then_wrapped([this, cqlop, stream, &client_state, linearization_buffer = std::move(linearization_buffer)] (future<std::unique_ptr<cql_server::response>> f) -> std::unique_ptr<cql_server::response> {
|
||||
}).then_wrapped([this, cqlop, stream, &client_state, linearization_buffer = std::move(linearization_buffer), trace_state] (future<std::unique_ptr<cql_server::response>> f) -> std::unique_ptr<cql_server::response> {
|
||||
auto stop_trace = defer([&] {
|
||||
tracing::stop_foreground(client_state.get_trace_state());
|
||||
tracing::stop_foreground(trace_state);
|
||||
});
|
||||
--_server._requests_serving;
|
||||
try {
|
||||
@@ -440,28 +441,28 @@ future<std::unique_ptr<cql_server::response>>
|
||||
break;
|
||||
}
|
||||
|
||||
tracing::set_response_size(client_state.get_trace_state(), response->size());
|
||||
tracing::set_response_size(trace_state, response->size());
|
||||
return response;
|
||||
} catch (const exceptions::unavailable_exception& ex) {
|
||||
return make_unavailable_error(stream, ex.code(), ex.what(), ex.consistency, ex.required, ex.alive, client_state.get_trace_state());
|
||||
return make_unavailable_error(stream, ex.code(), ex.what(), ex.consistency, ex.required, ex.alive, trace_state);
|
||||
} catch (const exceptions::read_timeout_exception& ex) {
|
||||
return make_read_timeout_error(stream, ex.code(), ex.what(), ex.consistency, ex.received, ex.block_for, ex.data_present, client_state.get_trace_state());
|
||||
return make_read_timeout_error(stream, ex.code(), ex.what(), ex.consistency, ex.received, ex.block_for, ex.data_present, trace_state);
|
||||
} catch (const exceptions::read_failure_exception& ex) {
|
||||
return make_read_failure_error(stream, ex.code(), ex.what(), ex.consistency, ex.received, ex.failures, ex.block_for, ex.data_present, client_state.get_trace_state());
|
||||
return make_read_failure_error(stream, ex.code(), ex.what(), ex.consistency, ex.received, ex.failures, ex.block_for, ex.data_present, trace_state);
|
||||
} catch (const exceptions::mutation_write_timeout_exception& ex) {
|
||||
return make_mutation_write_timeout_error(stream, ex.code(), ex.what(), ex.consistency, ex.received, ex.block_for, ex.type, client_state.get_trace_state());
|
||||
return make_mutation_write_timeout_error(stream, ex.code(), ex.what(), ex.consistency, ex.received, ex.block_for, ex.type, trace_state);
|
||||
} catch (const exceptions::mutation_write_failure_exception& ex) {
|
||||
return make_mutation_write_failure_error(stream, ex.code(), ex.what(), ex.consistency, ex.received, ex.failures, ex.block_for, ex.type, client_state.get_trace_state());
|
||||
return make_mutation_write_failure_error(stream, ex.code(), ex.what(), ex.consistency, ex.received, ex.failures, ex.block_for, ex.type, trace_state);
|
||||
} catch (const exceptions::already_exists_exception& ex) {
|
||||
return make_already_exists_error(stream, ex.code(), ex.what(), ex.ks_name, ex.cf_name, client_state.get_trace_state());
|
||||
return make_already_exists_error(stream, ex.code(), ex.what(), ex.ks_name, ex.cf_name, trace_state);
|
||||
} catch (const exceptions::prepared_query_not_found_exception& ex) {
|
||||
return make_unprepared_error(stream, ex.code(), ex.what(), ex.id, client_state.get_trace_state());
|
||||
return make_unprepared_error(stream, ex.code(), ex.what(), ex.id, trace_state);
|
||||
} catch (const exceptions::cassandra_exception& ex) {
|
||||
return make_error(stream, ex.code(), ex.what(), client_state.get_trace_state());
|
||||
return make_error(stream, ex.code(), ex.what(), trace_state);
|
||||
} catch (std::exception& ex) {
|
||||
return make_error(stream, exceptions::exception_code::SERVER_ERROR, ex.what(), client_state.get_trace_state());
|
||||
return make_error(stream, exceptions::exception_code::SERVER_ERROR, ex.what(), trace_state);
|
||||
} catch (...) {
|
||||
return make_error(stream, exceptions::exception_code::SERVER_ERROR, "unknown error", client_state.get_trace_state());
|
||||
return make_error(stream, exceptions::exception_code::SERVER_ERROR, "unknown error", trace_state);
|
||||
}
|
||||
});
|
||||
}
|
||||
@@ -661,8 +662,8 @@ future<fragmented_temporary_buffer> cql_server::connection::read_and_decompress_
|
||||
return _buffer_reader.read_exactly(_read_buf, length);
|
||||
}
|
||||
|
||||
future<std::unique_ptr<cql_server::response>> cql_server::connection::process_startup(uint16_t stream, request_reader in, service::client_state& client_state)
|
||||
{
|
||||
future<std::unique_ptr<cql_server::response>> cql_server::connection::process_startup(uint16_t stream, request_reader in, service::client_state& client_state,
|
||||
tracing::trace_state_ptr trace_state) {
|
||||
auto options = in.read_string_map();
|
||||
auto compression_opt = options.find("COMPRESSION");
|
||||
if (compression_opt != options.end()) {
|
||||
@@ -678,33 +679,31 @@ future<std::unique_ptr<cql_server::response>> cql_server::connection::process_st
|
||||
}
|
||||
auto& a = client_state.get_auth_service()->underlying_authenticator();
|
||||
if (a.require_authentication()) {
|
||||
return make_ready_future<std::unique_ptr<cql_server::response>>(make_autheticate(stream, a.qualified_java_name(), client_state.get_trace_state()));
|
||||
return make_ready_future<std::unique_ptr<cql_server::response>>(make_autheticate(stream, a.qualified_java_name(), trace_state));
|
||||
}
|
||||
return make_ready_future<std::unique_ptr<cql_server::response>>(make_ready(stream, client_state.get_trace_state()));
|
||||
return make_ready_future<std::unique_ptr<cql_server::response>>(make_ready(stream, trace_state));
|
||||
}
|
||||
|
||||
future<std::unique_ptr<cql_server::response>> cql_server::connection::process_auth_response(uint16_t stream, request_reader in, service::client_state& client_state)
|
||||
{
|
||||
future<std::unique_ptr<cql_server::response>> cql_server::connection::process_auth_response(uint16_t stream, request_reader in, service::client_state& client_state,
|
||||
tracing::trace_state_ptr trace_state) {
|
||||
auto sasl_challenge = client_state.get_auth_service()->underlying_authenticator().new_sasl_challenge();
|
||||
auto buf = in.read_raw_bytes_view(in.bytes_left());
|
||||
auto challenge = sasl_challenge->evaluate_response(buf);
|
||||
if (sasl_challenge->is_complete()) {
|
||||
return sasl_challenge->get_authenticated_user().then([this, sasl_challenge, stream, &client_state, challenge = std::move(challenge)](auth::authenticated_user user) mutable {
|
||||
return sasl_challenge->get_authenticated_user().then([this, sasl_challenge, stream, &client_state, challenge = std::move(challenge), trace_state](auth::authenticated_user user) mutable {
|
||||
client_state.set_login(::make_shared<auth::authenticated_user>(std::move(user)));
|
||||
auto f = client_state.check_user_can_login();
|
||||
return f.then([this, stream, &client_state, challenge = std::move(challenge)]() mutable {
|
||||
auto tr_state = client_state.get_trace_state();
|
||||
return make_ready_future<std::unique_ptr<cql_server::response>>(make_auth_success(stream, std::move(challenge), tr_state));
|
||||
return f.then([this, stream, &client_state, challenge = std::move(challenge), trace_state]() mutable {
|
||||
return make_ready_future<std::unique_ptr<cql_server::response>>(make_auth_success(stream, std::move(challenge), trace_state));
|
||||
});
|
||||
});
|
||||
}
|
||||
auto tr_state = client_state.get_trace_state();
|
||||
return make_ready_future<std::unique_ptr<cql_server::response>>(make_auth_challenge(stream, std::move(challenge), tr_state));
|
||||
return make_ready_future<std::unique_ptr<cql_server::response>>(make_auth_challenge(stream, std::move(challenge), trace_state));
|
||||
}
|
||||
|
||||
future<std::unique_ptr<cql_server::response>> cql_server::connection::process_options(uint16_t stream, request_reader in, service::client_state& client_state)
|
||||
{
|
||||
return make_ready_future<std::unique_ptr<cql_server::response>>(make_supported(stream, client_state.get_trace_state()));
|
||||
future<std::unique_ptr<cql_server::response>> cql_server::connection::process_options(uint16_t stream, request_reader in, service::client_state& client_state,
|
||||
tracing::trace_state_ptr trace_state) {
|
||||
return make_ready_future<std::unique_ptr<cql_server::response>>(make_supported(stream, std::move(trace_state)));
|
||||
}
|
||||
|
||||
void
|
||||
@@ -712,10 +711,10 @@ cql_server::connection::init_cql_serialization_format() {
|
||||
_cql_serialization_format = cql_serialization_format(_version);
|
||||
}
|
||||
|
||||
future<std::unique_ptr<cql_server::response>> cql_server::connection::process_query(uint16_t stream, request_reader in, service::client_state& client_state, service_permit permit)
|
||||
future<std::unique_ptr<cql_server::response>> cql_server::connection::process_query(uint16_t stream, request_reader in, service::client_state& client_state, service_permit permit, tracing::trace_state_ptr trace_state)
|
||||
{
|
||||
auto query = in.read_long_string_view();
|
||||
auto q_state = std::make_unique<cql_query_state>(client_state, std::move(permit));
|
||||
auto q_state = std::make_unique<cql_query_state>(client_state, trace_state, std::move(permit));
|
||||
auto& query_state = q_state->query_state;
|
||||
q_state->options = in.read_options(_version, _cql_serialization_format, this->timeout_config(), _server._cql_config);
|
||||
auto& options = *q_state->options;
|
||||
@@ -735,12 +734,12 @@ future<std::unique_ptr<cql_server::response>> cql_server::connection::process_qu
|
||||
}).finally([q_state = std::move(q_state)] {});
|
||||
}
|
||||
|
||||
future<std::unique_ptr<cql_server::response>> cql_server::connection::process_prepare(uint16_t stream, request_reader in, service::client_state& client_state)
|
||||
{
|
||||
future<std::unique_ptr<cql_server::response>> cql_server::connection::process_prepare(uint16_t stream, request_reader in, service::client_state& client_state,
|
||||
tracing::trace_state_ptr trace_state) {
|
||||
auto query = sstring(in.read_long_string_view());
|
||||
|
||||
tracing::add_query(client_state.get_trace_state(), query);
|
||||
tracing::begin(client_state.get_trace_state(), "Preparing CQL3 query", client_state.get_client_address());
|
||||
tracing::add_query(trace_state, query);
|
||||
tracing::begin(trace_state, "Preparing CQL3 query", client_state.get_client_address());
|
||||
|
||||
auto cpu_id = engine().cpu_id();
|
||||
auto cpus = boost::irange(0u, smp::count);
|
||||
@@ -752,19 +751,19 @@ future<std::unique_ptr<cql_server::response>> cql_server::connection::process_pr
|
||||
} else {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
}).then([this, query, stream, &client_state] () mutable {
|
||||
tracing::trace(client_state.get_trace_state(), "Done preparing on remote shards");
|
||||
return _server._query_processor.local().prepare(std::move(query), client_state, false).then([this, stream, &client_state] (auto msg) {
|
||||
tracing::trace(client_state.get_trace_state(), "Done preparing on a local shard - preparing a result. ID is [{}]", seastar::value_of([&msg] {
|
||||
}).then([this, query, stream, &client_state, trace_state] () mutable {
|
||||
tracing::trace(trace_state, "Done preparing on remote shards");
|
||||
return _server._query_processor.local().prepare(std::move(query), client_state, false).then([this, stream, &client_state, trace_state] (auto msg) {
|
||||
tracing::trace(trace_state, "Done preparing on a local shard - preparing a result. ID is [{}]", seastar::value_of([&msg] {
|
||||
return messages::result_message::prepared::cql::get_id(msg);
|
||||
}));
|
||||
return this->make_result(stream, msg, client_state.get_trace_state());
|
||||
return this->make_result(stream, msg, trace_state);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
future<std::unique_ptr<cql_server::response>> cql_server::connection::process_execute(uint16_t stream, request_reader in, service::client_state& client_state, service_permit permit)
|
||||
{
|
||||
future<std::unique_ptr<cql_server::response>> cql_server::connection::process_execute(uint16_t stream, request_reader in, service::client_state& client_state, service_permit permit,
|
||||
tracing::trace_state_ptr trace_state) {
|
||||
cql3::prepared_cache_key_type cache_key(in.read_short_bytes());
|
||||
auto& id = cql3::prepared_cache_key_type::cql_id(cache_key);
|
||||
bool needs_authorization = false;
|
||||
@@ -781,7 +780,7 @@ future<std::unique_ptr<cql_server::response>> cql_server::connection::process_ex
|
||||
throw exceptions::prepared_query_not_found_exception(id);
|
||||
}
|
||||
|
||||
auto q_state = std::make_unique<cql_query_state>(client_state, std::move(permit));
|
||||
auto q_state = std::make_unique<cql_query_state>(client_state, trace_state, std::move(permit));
|
||||
auto& query_state = q_state->query_state;
|
||||
if (_version == 1) {
|
||||
std::vector<cql3::raw_value_view> values;
|
||||
@@ -795,22 +794,22 @@ future<std::unique_ptr<cql_server::response>> cql_server::connection::process_ex
|
||||
auto& options = *q_state->options;
|
||||
auto skip_metadata = options.skip_metadata();
|
||||
|
||||
tracing::set_page_size(client_state.get_trace_state(), options.get_page_size());
|
||||
tracing::set_consistency_level(client_state.get_trace_state(), options.get_consistency());
|
||||
tracing::set_optional_serial_consistency_level(client_state.get_trace_state(), options.get_serial_consistency());
|
||||
tracing::add_query(client_state.get_trace_state(), prepared->raw_cql_statement);
|
||||
tracing::add_prepared_statement(client_state.get_trace_state(), prepared);
|
||||
tracing::set_page_size(trace_state, options.get_page_size());
|
||||
tracing::set_consistency_level(trace_state, options.get_consistency());
|
||||
tracing::set_optional_serial_consistency_level(trace_state, options.get_serial_consistency());
|
||||
tracing::add_query(trace_state, prepared->raw_cql_statement);
|
||||
tracing::add_prepared_statement(trace_state, prepared);
|
||||
|
||||
tracing::begin(client_state.get_trace_state(), seastar::value_of([&id] { return seastar::format("Execute CQL3 prepared query [{}]", id); }),
|
||||
tracing::begin(trace_state, seastar::value_of([&id] { return seastar::format("Execute CQL3 prepared query [{}]", id); }),
|
||||
client_state.get_client_address());
|
||||
|
||||
auto stmt = prepared->statement;
|
||||
tracing::trace(query_state.get_trace_state(), "Checking bounds");
|
||||
tracing::trace(trace_state, "Checking bounds");
|
||||
if (stmt->get_bound_terms() != options.get_values_count()) {
|
||||
const auto msg = format("Invalid amount of bind variables: expected {:d} received {:d}",
|
||||
stmt->get_bound_terms(),
|
||||
options.get_values_count());
|
||||
tracing::trace(query_state.get_trace_state(), msg);
|
||||
tracing::trace(trace_state, msg);
|
||||
throw exceptions::invalid_request_exception(msg);
|
||||
}
|
||||
|
||||
@@ -828,7 +827,7 @@ future<std::unique_ptr<cql_server::response>> cql_server::connection::process_ex
|
||||
}
|
||||
|
||||
future<std::unique_ptr<cql_server::response>>
|
||||
cql_server::connection::process_batch(uint16_t stream, request_reader in, service::client_state& client_state, service_permit permit)
|
||||
cql_server::connection::process_batch(uint16_t stream, request_reader in, service::client_state& client_state, service_permit permit, tracing::trace_state_ptr trace_state)
|
||||
{
|
||||
if (_version == 1) {
|
||||
throw exceptions::protocol_exception("BATCH messages are not support in version 1 of the protocol");
|
||||
@@ -844,7 +843,7 @@ cql_server::connection::process_batch(uint16_t stream, request_reader in, servic
|
||||
modifications.reserve(n);
|
||||
values.reserve(n);
|
||||
|
||||
tracing::begin(client_state.get_trace_state(), "Execute batch of CQL3 queries", client_state.get_client_address());
|
||||
tracing::begin(trace_state, "Execute batch of CQL3 queries", client_state.get_client_address());
|
||||
|
||||
for ([[gnu::unused]] auto i : boost::irange(0u, n)) {
|
||||
const auto kind = in.read_byte();
|
||||
@@ -858,7 +857,7 @@ cql_server::connection::process_batch(uint16_t stream, request_reader in, servic
|
||||
auto query = in.read_long_string_view();
|
||||
stmt_ptr = _server._query_processor.local().get_statement(query, client_state);
|
||||
ps = stmt_ptr->checked_weak_from_this();
|
||||
tracing::add_query(client_state.get_trace_state(), query);
|
||||
tracing::add_query(trace_state, query);
|
||||
break;
|
||||
}
|
||||
case 1: {
|
||||
@@ -877,7 +876,7 @@ cql_server::connection::process_batch(uint16_t stream, request_reader in, servic
|
||||
needs_authorization = pending_authorization_entries.emplace(std::move(cache_key), ps->checked_weak_from_this()).second;
|
||||
}
|
||||
|
||||
tracing::add_query(client_state.get_trace_state(), ps->raw_cql_statement);
|
||||
tracing::add_query(trace_state, ps->raw_cql_statement);
|
||||
break;
|
||||
}
|
||||
default:
|
||||
@@ -891,8 +890,8 @@ cql_server::connection::process_batch(uint16_t stream, request_reader in, servic
|
||||
}
|
||||
|
||||
::shared_ptr<cql3::statements::modification_statement> modif_statement_ptr = static_pointer_cast<cql3::statements::modification_statement>(ps->statement);
|
||||
tracing::add_table_name(client_state.get_trace_state(), modif_statement_ptr->keyspace(), modif_statement_ptr->column_family());
|
||||
tracing::add_prepared_statement(client_state.get_trace_state(), ps);
|
||||
tracing::add_table_name(trace_state, modif_statement_ptr->keyspace(), modif_statement_ptr->column_family());
|
||||
tracing::add_prepared_statement(trace_state, ps);
|
||||
|
||||
modifications.emplace_back(std::move(modif_statement_ptr), needs_authorization);
|
||||
|
||||
@@ -907,15 +906,15 @@ cql_server::connection::process_batch(uint16_t stream, request_reader in, servic
|
||||
values.emplace_back(std::move(tmp));
|
||||
}
|
||||
|
||||
auto q_state = std::make_unique<cql_query_state>(client_state, std::move(permit));
|
||||
auto q_state = std::make_unique<cql_query_state>(client_state, trace_state, std::move(permit));
|
||||
auto& query_state = q_state->query_state;
|
||||
// #563. CQL v2 encodes query_options in v1 format for batch requests.
|
||||
q_state->options = std::make_unique<cql3::query_options>(cql3::query_options::make_batch_options(std::move(*in.read_options(_version < 3 ? 1 : _version, _cql_serialization_format, this->timeout_config(), _server._cql_config)), std::move(values)));
|
||||
auto& options = *q_state->options;
|
||||
|
||||
tracing::set_consistency_level(client_state.get_trace_state(), options.get_consistency());
|
||||
tracing::set_optional_serial_consistency_level(client_state.get_trace_state(), options.get_serial_consistency());
|
||||
tracing::trace(client_state.get_trace_state(), "Creating a batch statement");
|
||||
tracing::set_consistency_level(trace_state, options.get_consistency());
|
||||
tracing::set_optional_serial_consistency_level(trace_state, options.get_serial_consistency());
|
||||
tracing::trace(trace_state, "Creating a batch statement");
|
||||
|
||||
auto batch = ::make_shared<cql3::statements::batch_statement>(cql3::statements::batch_statement::type(type), std::move(modifications), cql3::attributes::none(), _server._query_processor.local().get_cql_stats());
|
||||
return _server._query_processor.local().process_batch(batch, query_state, options, std::move(pending_authorization_entries)).then([this, stream, batch, &query_state] (auto msg) {
|
||||
@@ -928,15 +927,15 @@ cql_server::connection::process_batch(uint16_t stream, request_reader in, servic
|
||||
}
|
||||
|
||||
future<std::unique_ptr<cql_server::response>>
|
||||
cql_server::connection::process_register(uint16_t stream, request_reader in, service::client_state& client_state)
|
||||
{
|
||||
cql_server::connection::process_register(uint16_t stream, request_reader in, service::client_state& client_state,
|
||||
tracing::trace_state_ptr trace_state) {
|
||||
std::vector<sstring> event_types;
|
||||
in.read_string_list(event_types);
|
||||
for (auto&& event_type : event_types) {
|
||||
auto et = parse_event_type(event_type);
|
||||
_server._notifier->register_event(et, this);
|
||||
}
|
||||
return make_ready_future<std::unique_ptr<cql_server::response>>(make_ready(stream, client_state.get_trace_state()));
|
||||
return make_ready_future<std::unique_ptr<cql_server::response>>(make_ready(stream, std::move(trace_state)));
|
||||
}
|
||||
|
||||
std::unique_ptr<cql_server::response> cql_server::connection::make_unavailable_error(int16_t stream, exceptions::exception_code err, sstring msg, db::consistency_level cl, int32_t required, int32_t alive, const tracing::trace_state_ptr& tr_state)
|
||||
|
||||
@@ -94,8 +94,8 @@ struct cql_query_state {
|
||||
service::query_state query_state;
|
||||
std::unique_ptr<cql3::query_options> options;
|
||||
|
||||
cql_query_state(service::client_state& client_state, service_permit permit)
|
||||
: query_state(client_state, std::move(permit))
|
||||
cql_query_state(service::client_state& client_state, tracing::trace_state_ptr trace_state, service_permit permit)
|
||||
: query_state(client_state, std::move(trace_state), std::move(permit))
|
||||
{ }
|
||||
};
|
||||
|
||||
@@ -186,14 +186,14 @@ private:
|
||||
cql_binary_frame_v3 parse_frame(temporary_buffer<char> buf);
|
||||
future<fragmented_temporary_buffer> read_and_decompress_frame(size_t length, uint8_t flags);
|
||||
future<std::optional<cql_binary_frame_v3>> read_frame();
|
||||
future<std::unique_ptr<cql_server::response>> process_startup(uint16_t stream, request_reader in, service::client_state& client_state);
|
||||
future<std::unique_ptr<cql_server::response>> process_auth_response(uint16_t stream, request_reader in, service::client_state& client_state);
|
||||
future<std::unique_ptr<cql_server::response>> process_options(uint16_t stream, request_reader in, service::client_state& client_state);
|
||||
future<std::unique_ptr<cql_server::response>> process_query(uint16_t stream, request_reader in, service::client_state& client_state, service_permit permit);
|
||||
future<std::unique_ptr<cql_server::response>> process_prepare(uint16_t stream, request_reader in, service::client_state& client_state);
|
||||
future<std::unique_ptr<cql_server::response>> process_execute(uint16_t stream, request_reader in, service::client_state& client_state, service_permit permit);
|
||||
future<std::unique_ptr<cql_server::response>> process_batch(uint16_t stream, request_reader in, service::client_state& client_state, service_permit permit);
|
||||
future<std::unique_ptr<cql_server::response>> process_register(uint16_t stream, request_reader in, service::client_state& client_state);
|
||||
future<std::unique_ptr<cql_server::response>> process_startup(uint16_t stream, request_reader in, service::client_state& client_state, tracing::trace_state_ptr trace_state);
|
||||
future<std::unique_ptr<cql_server::response>> process_auth_response(uint16_t stream, request_reader in, service::client_state& client_state, tracing::trace_state_ptr trace_state);
|
||||
future<std::unique_ptr<cql_server::response>> process_options(uint16_t stream, request_reader in, service::client_state& client_state, tracing::trace_state_ptr trace_state);
|
||||
future<std::unique_ptr<cql_server::response>> process_query(uint16_t stream, request_reader in, service::client_state& client_state, service_permit permit, tracing::trace_state_ptr trace_state);
|
||||
future<std::unique_ptr<cql_server::response>> process_prepare(uint16_t stream, request_reader in, service::client_state& client_state, tracing::trace_state_ptr trace_state);
|
||||
future<std::unique_ptr<cql_server::response>> process_execute(uint16_t stream, request_reader in, service::client_state& client_state, service_permit permit, tracing::trace_state_ptr trace_state);
|
||||
future<std::unique_ptr<cql_server::response>> process_batch(uint16_t stream, request_reader in, service::client_state& client_state, service_permit permit, tracing::trace_state_ptr trace_state);
|
||||
future<std::unique_ptr<cql_server::response>> process_register(uint16_t stream, request_reader in, service::client_state& client_state, tracing::trace_state_ptr trace_state);
|
||||
|
||||
std::unique_ptr<cql_server::response> make_unavailable_error(int16_t stream, exceptions::exception_code err, sstring msg, db::consistency_level cl, int32_t required, int32_t alive, const tracing::trace_state_ptr& tr_state);
|
||||
std::unique_ptr<cql_server::response> make_read_timeout_error(int16_t stream, exceptions::exception_code err, sstring msg, db::consistency_level cl, int32_t received, int32_t blockfor, bool data_present, const tracing::trace_state_ptr& tr_state);
|
||||
|
||||
17
types.cc
17
types.cc
@@ -2253,13 +2253,20 @@ static size_t concrete_serialized_size(const string_type_impl::native_type& v) {
|
||||
static size_t concrete_serialized_size(const bytes_type_impl::native_type& v) { return v.size(); }
|
||||
static size_t concrete_serialized_size(const inet_addr_type_impl::native_type& v) { return v.get().size(); }
|
||||
|
||||
static size_t concrete_serialized_size(const varint_type_impl::native_type& v) {
|
||||
const auto& num = v.get();
|
||||
if (!num) {
|
||||
static size_t concrete_serialized_size_aux(const boost::multiprecision::cpp_int& num) {
|
||||
if (num) {
|
||||
return align_up(boost::multiprecision::msb(num) + 2, 8u) / 8;
|
||||
} else {
|
||||
return 1;
|
||||
}
|
||||
auto pnum = abs(num);
|
||||
return align_up(boost::multiprecision::msb(pnum) + 2, 8u) / 8;
|
||||
}
|
||||
|
||||
static size_t concrete_serialized_size(const varint_type_impl::native_type& v) {
|
||||
const auto& num = v.get();
|
||||
if (num < 0) {
|
||||
return concrete_serialized_size_aux(-num - 1);
|
||||
}
|
||||
return concrete_serialized_size_aux(num);
|
||||
}
|
||||
|
||||
static size_t concrete_serialized_size(const decimal_type_impl::native_type& v) {
|
||||
|
||||
@@ -59,11 +59,15 @@ public:
|
||||
return (most_sig_bits >> 12) & 0xf;
|
||||
}
|
||||
|
||||
bool is_timestamp() const {
|
||||
return version() == 1;
|
||||
}
|
||||
|
||||
int64_t timestamp() const {
|
||||
//if (version() != 1) {
|
||||
// throw new UnsupportedOperationException("Not a time-based UUID");
|
||||
//}
|
||||
assert(version() == 1);
|
||||
assert(is_timestamp());
|
||||
|
||||
return ((most_sig_bits & 0xFFF) << 48) |
|
||||
(((most_sig_bits >> 16) & 0xFFFF) << 32) |
|
||||
|
||||
@@ -77,7 +77,7 @@ private:
|
||||
// placement of this singleton is important. It needs to be instantiated *AFTER* the other statics.
|
||||
static thread_local const std::unique_ptr<UUID_gen> instance;
|
||||
|
||||
int64_t last_nanos = 0;
|
||||
uint64_t last_nanos = 0;
|
||||
|
||||
UUID_gen()
|
||||
{
|
||||
@@ -93,7 +93,9 @@ public:
|
||||
*/
|
||||
static UUID get_time_UUID()
|
||||
{
|
||||
return UUID(instance->create_time_safe(), clock_seq_and_node);
|
||||
auto uuid = UUID(instance->create_time_safe(), clock_seq_and_node);
|
||||
assert(uuid.is_timestamp());
|
||||
return uuid;
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -103,7 +105,9 @@ public:
|
||||
*/
|
||||
static UUID get_time_UUID(int64_t when)
|
||||
{
|
||||
return UUID(create_time(from_unix_timestamp(when)), clock_seq_and_node);
|
||||
auto uuid = UUID(create_time(from_unix_timestamp(when)), clock_seq_and_node);
|
||||
assert(uuid.is_timestamp());
|
||||
return uuid;
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -117,12 +121,16 @@ public:
|
||||
// "nanos" needs to be in 100ns intervals since the adoption of the Gregorian calendar in the West.
|
||||
uint64_t nanos = duration_cast<nanoseconds>(tp.time_since_epoch()).count() / 100;
|
||||
nanos -= (10000ULL * START_EPOCH);
|
||||
return UUID(create_time(nanos), clock_seq_and_node);
|
||||
auto uuid = UUID(create_time(nanos), clock_seq_and_node);
|
||||
assert(uuid.is_timestamp());
|
||||
return uuid;
|
||||
}
|
||||
|
||||
static UUID get_time_UUID(int64_t when, int64_t clock_seq_and_node)
|
||||
{
|
||||
return UUID(create_time(from_unix_timestamp(when)), clock_seq_and_node);
|
||||
auto uuid = UUID(create_time(from_unix_timestamp(when)), clock_seq_and_node);
|
||||
assert(uuid.is_timestamp());
|
||||
return uuid;
|
||||
}
|
||||
/**
|
||||
* Similar to get_time_UUID, but randomize the clock and sequence.
|
||||
@@ -142,7 +150,14 @@ public:
|
||||
int64_t when_in_millis = when_in_micros / 1000;
|
||||
int64_t nanos = (when_in_micros - (when_in_millis * 1000)) * 10;
|
||||
|
||||
return UUID(create_time(from_unix_timestamp(when_in_millis) + nanos), rand_dist(rand_gen));
|
||||
auto uuid = UUID(create_time(from_unix_timestamp(when_in_millis) + nanos), rand_dist(rand_gen));
|
||||
assert(uuid.is_timestamp());
|
||||
return uuid;
|
||||
}
|
||||
|
||||
/** validates uuid from raw bytes. */
|
||||
static bool is_valid_UUID(bytes raw) {
|
||||
return raw.size() == 16;
|
||||
}
|
||||
|
||||
/** creates uuid from raw bytes. */
|
||||
@@ -198,7 +213,9 @@ public:
|
||||
*/
|
||||
static UUID min_time_UUID(int64_t timestamp)
|
||||
{
|
||||
return UUID(create_time(from_unix_timestamp(timestamp)), MIN_CLOCK_SEQ_AND_NODE);
|
||||
auto uuid = UUID(create_time(from_unix_timestamp(timestamp)), MIN_CLOCK_SEQ_AND_NODE);
|
||||
assert(uuid.is_timestamp());
|
||||
return uuid;
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -214,7 +231,9 @@ public:
|
||||
// timestamp 1ms, then we should not extend 100's nanoseconds
|
||||
// precision by taking 10000, but rather 19999.
|
||||
int64_t uuid_tstamp = from_unix_timestamp(timestamp + 1) - 1;
|
||||
return UUID(create_time(uuid_tstamp), MAX_CLOCK_SEQ_AND_NODE);
|
||||
auto uuid = UUID(create_time(uuid_tstamp), MAX_CLOCK_SEQ_AND_NODE);
|
||||
assert(uuid.is_timestamp());
|
||||
return uuid;
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -308,6 +327,15 @@ public:
|
||||
return (uuid.timestamp() / 10000) + START_EPOCH;
|
||||
}
|
||||
|
||||
static uint64_t make_nanos_since(int64_t millis) {
|
||||
return (static_cast<uint64_t>(millis) - static_cast<uint64_t>(START_EPOCH)) * 10000;
|
||||
}
|
||||
|
||||
// nanos_since must fit in 60 bits
|
||||
static bool is_valid_nanos_since(uint64_t nanos_since) {
|
||||
return !(0xf000000000000000UL & nanos_since);
|
||||
}
|
||||
|
||||
private:
|
||||
|
||||
// needs to return two different values for the same when.
|
||||
@@ -319,7 +347,7 @@ private:
|
||||
using namespace std::chrono;
|
||||
int64_t millis = duration_cast<milliseconds>(
|
||||
system_clock::now().time_since_epoch()).count();
|
||||
int64_t nanos_since = (millis - START_EPOCH) * 10000;
|
||||
uint64_t nanos_since = make_nanos_since(millis);
|
||||
if (nanos_since > last_nanos)
|
||||
last_nanos = nanos_since;
|
||||
else
|
||||
@@ -330,16 +358,17 @@ private:
|
||||
|
||||
int64_t create_time_unsafe(int64_t when, int nanos)
|
||||
{
|
||||
uint64_t nanos_since = ((when - START_EPOCH) * 10000) + nanos;
|
||||
uint64_t nanos_since = make_nanos_since(when) + static_cast<uint64_t>(static_cast<int64_t>(nanos));
|
||||
return create_time(nanos_since);
|
||||
}
|
||||
|
||||
static int64_t create_time(uint64_t nanos_since)
|
||||
{
|
||||
uint64_t msb = 0L;
|
||||
assert(is_valid_nanos_since(nanos_since));
|
||||
msb |= (0x00000000ffffffffL & nanos_since) << 32;
|
||||
msb |= (0x0000ffff00000000UL & nanos_since) >> 16;
|
||||
msb |= (0xffff000000000000UL & nanos_since) >> 48;
|
||||
msb |= (0x0fff000000000000UL & nanos_since) >> 48;
|
||||
msb |= 0x0000000000001000L; // sets the version to 1.
|
||||
return msb;
|
||||
}
|
||||
|
||||
@@ -2064,6 +2064,17 @@ bool segment_pool::migrate_segment(segment* src, segment* dst)
|
||||
}
|
||||
|
||||
void tracker::impl::register_region(region::impl* r) {
|
||||
// If needed, increase capacity of regions before taking the reclaim lock,
|
||||
// to avoid failing an allocation when push_back() tries to increase
|
||||
// capacity.
|
||||
//
|
||||
// The capacity increase is atomic (wrt _regions) so it cannot be
|
||||
// observed
|
||||
if (_regions.size() == _regions.capacity()) {
|
||||
auto copy = _regions;
|
||||
copy.reserve(copy.capacity() * 2);
|
||||
_regions = std::move(copy);
|
||||
}
|
||||
reclaiming_lock _(*this);
|
||||
_regions.push_back(r);
|
||||
llogger.debug("Registered region @{} with id={}", r, r->id());
|
||||
|
||||
Reference in New Issue
Block a user