Compare commits
47 Commits
branch-2.2
...
branch-2.2
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d27eb734a7 | ||
|
|
e6aeb490b5 | ||
|
|
2e3b09b593 | ||
|
|
92c74f4e0b | ||
|
|
89d835e9e3 | ||
|
|
263a740084 | ||
|
|
7f24b5319e | ||
|
|
fe16c0e985 | ||
|
|
f85badaaac | ||
|
|
2193d41683 | ||
|
|
1e1f0c29bf | ||
|
|
84d4588b5f | ||
|
|
7b43b26709 | ||
|
|
0ed01acf15 | ||
|
|
7ce160f408 | ||
|
|
5017d9b46a | ||
|
|
50b6ab3552 | ||
|
|
b1652823aa | ||
|
|
02b24aec34 | ||
|
|
22eea4d8cf | ||
|
|
d257f6d57c | ||
|
|
6fca92ac3c | ||
|
|
26e3917046 | ||
|
|
3892594a93 | ||
|
|
4b24439841 | ||
|
|
a02a4592d8 | ||
|
|
b6e1c08451 | ||
|
|
9469afcd27 | ||
|
|
240b9f122b | ||
|
|
cb16cd7724 | ||
|
|
c864d198fc | ||
|
|
25125e9c4f | ||
|
|
faf10fe6aa | ||
|
|
f76269cdcf | ||
|
|
a9b0ccf116 | ||
|
|
abc5941f87 | ||
|
|
a152ac12af | ||
|
|
c274fdf2ec | ||
|
|
5b88d6b4d6 | ||
|
|
2d626e1cf8 | ||
|
|
c11bd3e1cf | ||
|
|
9df3df92bc | ||
|
|
8ad9578a6c | ||
|
|
4cb6061a9f | ||
|
|
1940e6bd95 | ||
|
|
044cfde5f3 | ||
|
|
262a246436 |
@@ -1,6 +1,6 @@
|
||||
#!/bin/sh
|
||||
|
||||
VERSION=2.2.rc2
|
||||
VERSION=2.2.2
|
||||
|
||||
if test -f version
|
||||
then
|
||||
|
||||
@@ -2193,11 +2193,11 @@
|
||||
"description":"The column family"
|
||||
},
|
||||
"total":{
|
||||
"type":"int",
|
||||
"type":"long",
|
||||
"description":"The total snapshot size"
|
||||
},
|
||||
"live":{
|
||||
"type":"int",
|
||||
"type":"long",
|
||||
"description":"The live snapshot size"
|
||||
}
|
||||
}
|
||||
|
||||
@@ -149,7 +149,9 @@ static sstring gensalt() {
|
||||
// blowfish 2011 fix, blowfish, sha512, sha256, md5
|
||||
for (sstring pfx : { "$2y$", "$2a$", "$6$", "$5$", "$1$" }) {
|
||||
salt = pfx + input;
|
||||
if (crypt_r("fisk", salt.c_str(), &tlcrypt)) {
|
||||
const char* e = crypt_r("fisk", salt.c_str(), &tlcrypt);
|
||||
|
||||
if (e && (e[0] != '*')) {
|
||||
prefix = pfx;
|
||||
return salt;
|
||||
}
|
||||
|
||||
@@ -127,7 +127,7 @@ public:
|
||||
|
||||
class compaction_controller : public backlog_controller {
|
||||
public:
|
||||
static constexpr unsigned normalization_factor = 10;
|
||||
static constexpr unsigned normalization_factor = 30;
|
||||
compaction_controller(seastar::scheduling_group sg, const ::io_priority_class& iop, float static_shares) : backlog_controller(sg, iop, static_shares) {}
|
||||
compaction_controller(seastar::scheduling_group sg, const ::io_priority_class& iop, std::chrono::milliseconds interval, std::function<float()> current_backlog)
|
||||
: backlog_controller(sg, iop, std::move(interval),
|
||||
|
||||
@@ -60,6 +60,7 @@ class cache_flat_mutation_reader final : public flat_mutation_reader::impl {
|
||||
// - _next_row_in_range = _next.position() < _upper_bound
|
||||
// - _last_row points at a direct predecessor of the next row which is going to be read.
|
||||
// Used for populating continuity.
|
||||
// - _population_range_starts_before_all_rows is set accordingly
|
||||
reading_from_underlying,
|
||||
|
||||
end_of_stream
|
||||
@@ -86,6 +87,13 @@ class cache_flat_mutation_reader final : public flat_mutation_reader::impl {
|
||||
partition_snapshot_row_cursor _next_row;
|
||||
bool _next_row_in_range = false;
|
||||
|
||||
// True iff current population interval, since the previous clustering row, starts before all clustered rows.
|
||||
// We cannot just look at _lower_bound, because emission of range tombstones changes _lower_bound and
|
||||
// because we mark clustering intervals as continuous when consuming a clustering_row, it would prevent
|
||||
// us from marking the interval as continuous.
|
||||
// Valid when _state == reading_from_underlying.
|
||||
bool _population_range_starts_before_all_rows;
|
||||
|
||||
future<> do_fill_buffer(db::timeout_clock::time_point);
|
||||
void copy_from_cache_to_buffer();
|
||||
future<> process_static_row(db::timeout_clock::time_point);
|
||||
@@ -226,6 +234,7 @@ inline
|
||||
future<> cache_flat_mutation_reader::do_fill_buffer(db::timeout_clock::time_point timeout) {
|
||||
if (_state == state::move_to_underlying) {
|
||||
_state = state::reading_from_underlying;
|
||||
_population_range_starts_before_all_rows = _lower_bound.is_before_all_clustered_rows(*_schema);
|
||||
auto end = _next_row_in_range ? position_in_partition(_next_row.position())
|
||||
: position_in_partition(_upper_bound);
|
||||
return _read_context->fast_forward_to(position_range{_lower_bound, std::move(end)}, timeout).then([this, timeout] {
|
||||
@@ -351,7 +360,7 @@ future<> cache_flat_mutation_reader::read_from_underlying(db::timeout_clock::tim
|
||||
|
||||
inline
|
||||
bool cache_flat_mutation_reader::ensure_population_lower_bound() {
|
||||
if (!_ck_ranges_curr->start()) {
|
||||
if (_population_range_starts_before_all_rows) {
|
||||
return true;
|
||||
}
|
||||
if (!_last_row.refresh(*_snp)) {
|
||||
@@ -406,6 +415,7 @@ inline
|
||||
void cache_flat_mutation_reader::maybe_add_to_cache(const clustering_row& cr) {
|
||||
if (!can_populate()) {
|
||||
_last_row = nullptr;
|
||||
_population_range_starts_before_all_rows = false;
|
||||
_read_context->cache().on_mispopulate();
|
||||
return;
|
||||
}
|
||||
@@ -439,6 +449,7 @@ void cache_flat_mutation_reader::maybe_add_to_cache(const clustering_row& cr) {
|
||||
with_allocator(standard_allocator(), [&] {
|
||||
_last_row = partition_snapshot_row_weakref(*_snp, it, true);
|
||||
});
|
||||
_population_range_starts_before_all_rows = false;
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -67,6 +67,12 @@ class error_collector : public error_listener<RecognizerType, ExceptionBaseType>
|
||||
*/
|
||||
const sstring_view _query;
|
||||
|
||||
/**
|
||||
* An empty bitset to be used as a workaround for AntLR null dereference
|
||||
* bug.
|
||||
*/
|
||||
static typename ExceptionBaseType::BitsetListType _empty_bit_list;
|
||||
|
||||
public:
|
||||
|
||||
/**
|
||||
@@ -144,6 +150,14 @@ private:
|
||||
break;
|
||||
}
|
||||
default:
|
||||
// AntLR Exception class has a bug of dereferencing a null
|
||||
// pointer in the displayRecognitionError. The following
|
||||
// if statement makes sure it will not be null before the
|
||||
// call to that function (displayRecognitionError).
|
||||
// bug reference: https://github.com/antlr/antlr3/issues/191
|
||||
if (!ex->get_expectingSet()) {
|
||||
ex->set_expectingSet(&_empty_bit_list);
|
||||
}
|
||||
ex->displayRecognitionError(token_names, msg);
|
||||
}
|
||||
return msg.str();
|
||||
@@ -345,4 +359,8 @@ private:
|
||||
#endif
|
||||
};
|
||||
|
||||
template<typename RecognizerType, typename TokenType, typename ExceptionBaseType>
|
||||
typename ExceptionBaseType::BitsetListType
|
||||
error_collector<RecognizerType,TokenType,ExceptionBaseType>::_empty_bit_list = typename ExceptionBaseType::BitsetListType();
|
||||
|
||||
}
|
||||
|
||||
@@ -209,19 +209,18 @@ void query_options::prepare(const std::vector<::shared_ptr<column_specification>
|
||||
}
|
||||
|
||||
auto& names = *_names;
|
||||
std::vector<cql3::raw_value> ordered_values;
|
||||
std::vector<cql3::raw_value_view> ordered_values;
|
||||
ordered_values.reserve(specs.size());
|
||||
for (auto&& spec : specs) {
|
||||
auto& spec_name = spec->name->text();
|
||||
for (size_t j = 0; j < names.size(); j++) {
|
||||
if (names[j] == spec_name) {
|
||||
ordered_values.emplace_back(_values[j]);
|
||||
ordered_values.emplace_back(_value_views[j]);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
_values = std::move(ordered_values);
|
||||
fill_value_views();
|
||||
_value_views = std::move(ordered_values);
|
||||
}
|
||||
|
||||
void query_options::fill_value_views()
|
||||
|
||||
@@ -202,6 +202,14 @@ public:
|
||||
const query_options& options,
|
||||
gc_clock::time_point now) const override;
|
||||
|
||||
virtual std::vector<bytes_opt> values_raw(const query_options& options) const = 0;
|
||||
|
||||
virtual std::vector<bytes_opt> values(const query_options& options) const override {
|
||||
std::vector<bytes_opt> ret = values_raw(options);
|
||||
std::sort(ret.begin(),ret.end());
|
||||
ret.erase(std::unique(ret.begin(),ret.end()),ret.end());
|
||||
return ret;
|
||||
}
|
||||
#if 0
|
||||
@Override
|
||||
protected final boolean isSupportedBy(SecondaryIndex index)
|
||||
@@ -224,7 +232,7 @@ public:
|
||||
return abstract_restriction::term_uses_function(_values, ks_name, function_name);
|
||||
}
|
||||
|
||||
virtual std::vector<bytes_opt> values(const query_options& options) const override {
|
||||
virtual std::vector<bytes_opt> values_raw(const query_options& options) const override {
|
||||
std::vector<bytes_opt> ret;
|
||||
for (auto&& v : _values) {
|
||||
ret.emplace_back(to_bytes_opt(v->bind_and_get(options)));
|
||||
@@ -249,7 +257,7 @@ public:
|
||||
return false;
|
||||
}
|
||||
|
||||
virtual std::vector<bytes_opt> values(const query_options& options) const override {
|
||||
virtual std::vector<bytes_opt> values_raw(const query_options& options) const override {
|
||||
auto&& lval = dynamic_pointer_cast<multi_item_terminal>(_marker->bind(options));
|
||||
if (!lval) {
|
||||
throw exceptions::invalid_request_exception("Invalid null value for IN restriction");
|
||||
|
||||
@@ -105,9 +105,11 @@ public:
|
||||
virtual void reset() = 0;
|
||||
|
||||
virtual assignment_testable::test_result test_assignment(database& db, const sstring& keyspace, ::shared_ptr<column_specification> receiver) override {
|
||||
if (receiver->type == get_type()) {
|
||||
auto t1 = receiver->type->underlying_type();
|
||||
auto t2 = get_type()->underlying_type();
|
||||
if (t1 == t2) {
|
||||
return assignment_testable::test_result::EXACT_MATCH;
|
||||
} else if (receiver->type->is_value_compatible_with(*get_type())) {
|
||||
} else if (t1->is_value_compatible_with(*t2)) {
|
||||
return assignment_testable::test_result::WEAKLY_ASSIGNABLE;
|
||||
} else {
|
||||
return assignment_testable::test_result::NOT_ASSIGNABLE;
|
||||
|
||||
@@ -53,6 +53,9 @@ update_parameters::get_prefetched_list(
|
||||
return {};
|
||||
}
|
||||
|
||||
if (column.is_static()) {
|
||||
ckey = clustering_key_view::make_empty();
|
||||
}
|
||||
auto i = _prefetched->rows.find(std::make_pair(std::move(pkey), std::move(ckey)));
|
||||
if (i == _prefetched->rows.end()) {
|
||||
return {};
|
||||
|
||||
53
database.cc
53
database.cc
@@ -361,9 +361,13 @@ filter_sstable_for_reader(std::vector<sstables::shared_sstable>&& sstables, colu
|
||||
};
|
||||
sstables.erase(boost::remove_if(sstables, sstable_has_not_key), sstables.end());
|
||||
|
||||
// FIXME: Workaround for https://github.com/scylladb/scylla/issues/3552
|
||||
// and https://github.com/scylladb/scylla/issues/3553
|
||||
const bool filtering_broken = true;
|
||||
|
||||
// no clustering filtering is applied if schema defines no clustering key or
|
||||
// compaction strategy thinks it will not benefit from such an optimization.
|
||||
if (!schema->clustering_key_size() || !cf.get_compaction_strategy().use_clustering_key_filter()) {
|
||||
if (filtering_broken || !schema->clustering_key_size() || !cf.get_compaction_strategy().use_clustering_key_filter()) {
|
||||
return sstables;
|
||||
}
|
||||
::cf_stats* stats = cf.cf_stats();
|
||||
@@ -1633,9 +1637,9 @@ future<> distributed_loader::open_sstable(distributed<database>& db, sstables::e
|
||||
// to distribute evenly the resource usage among all shards.
|
||||
|
||||
return db.invoke_on(column_family::calculate_shard_from_sstable_generation(comps.generation),
|
||||
[&db, comps = std::move(comps), func = std::move(func), pc] (database& local) {
|
||||
[&db, comps = std::move(comps), func = std::move(func), &pc] (database& local) {
|
||||
|
||||
return with_semaphore(local.sstable_load_concurrency_sem(), 1, [&db, &local, comps = std::move(comps), func = std::move(func), pc] {
|
||||
return with_semaphore(local.sstable_load_concurrency_sem(), 1, [&db, &local, comps = std::move(comps), func = std::move(func), &pc] {
|
||||
auto& cf = local.find_column_family(comps.ks, comps.cf);
|
||||
|
||||
auto f = sstables::sstable::load_shared_components(cf.schema(), cf._config.datadir, comps.generation, comps.version, comps.format, pc);
|
||||
@@ -2159,6 +2163,11 @@ database::database(const db::config& cfg, database_config dbcfg)
|
||||
void backlog_controller::adjust() {
|
||||
auto backlog = _current_backlog();
|
||||
|
||||
if (backlog >= _control_points.back().input) {
|
||||
update_controller(_control_points.back().output);
|
||||
return;
|
||||
}
|
||||
|
||||
// interpolate to find out which region we are. This run infrequently and there are a fixed
|
||||
// number of points so a simple loop will do.
|
||||
size_t idx = 1;
|
||||
@@ -2808,6 +2817,7 @@ keyspace::make_column_family_config(const schema& s, const db::config& db_config
|
||||
cfg.enable_disk_writes = _config.enable_disk_writes;
|
||||
cfg.enable_commitlog = _config.enable_commitlog;
|
||||
cfg.enable_cache = _config.enable_cache;
|
||||
cfg.compaction_enforce_min_threshold = _config.compaction_enforce_min_threshold;
|
||||
cfg.dirty_memory_manager = _config.dirty_memory_manager;
|
||||
cfg.streaming_dirty_memory_manager = _config.streaming_dirty_memory_manager;
|
||||
cfg.read_concurrency_semaphore = _config.read_concurrency_semaphore;
|
||||
@@ -3577,6 +3587,7 @@ database::make_keyspace_config(const keyspace_metadata& ksm) {
|
||||
cfg.enable_commitlog = false;
|
||||
cfg.enable_cache = false;
|
||||
}
|
||||
cfg.compaction_enforce_min_threshold = _cfg->compaction_enforce_min_threshold();
|
||||
cfg.dirty_memory_manager = &_dirty_memory_manager;
|
||||
cfg.streaming_dirty_memory_manager = &_streaming_dirty_memory_manager;
|
||||
cfg.read_concurrency_semaphore = &_read_concurrency_sem;
|
||||
@@ -4537,16 +4548,14 @@ flat_mutation_reader make_local_shard_sstable_reader(schema_ptr s,
|
||||
}
|
||||
return reader;
|
||||
};
|
||||
return make_combined_reader(s, std::make_unique<incremental_reader_selector>(s,
|
||||
std::move(sstables),
|
||||
pr,
|
||||
slice,
|
||||
pc,
|
||||
std::move(resource_tracker),
|
||||
std::move(trace_state),
|
||||
fwd,
|
||||
fwd_mr,
|
||||
std::move(reader_factory_fn)),
|
||||
auto all_readers = boost::copy_range<std::vector<flat_mutation_reader>>(
|
||||
*sstables->all()
|
||||
| boost::adaptors::transformed([&] (sstables::shared_sstable sst) -> flat_mutation_reader {
|
||||
return reader_factory_fn(sst, pr);
|
||||
})
|
||||
);
|
||||
return make_combined_reader(s,
|
||||
std::move(all_readers),
|
||||
fwd,
|
||||
fwd_mr);
|
||||
}
|
||||
@@ -4565,16 +4574,14 @@ flat_mutation_reader make_range_sstable_reader(schema_ptr s,
|
||||
auto reader_factory_fn = [s, &slice, &pc, resource_tracker, fwd, fwd_mr, &monitor_generator] (sstables::shared_sstable& sst, const dht::partition_range& pr) {
|
||||
return sst->read_range_rows_flat(s, pr, slice, pc, resource_tracker, fwd, fwd_mr, monitor_generator(sst));
|
||||
};
|
||||
return make_combined_reader(s, std::make_unique<incremental_reader_selector>(s,
|
||||
std::move(sstables),
|
||||
pr,
|
||||
slice,
|
||||
pc,
|
||||
std::move(resource_tracker),
|
||||
std::move(trace_state),
|
||||
fwd,
|
||||
fwd_mr,
|
||||
std::move(reader_factory_fn)),
|
||||
auto sstable_readers = boost::copy_range<std::vector<flat_mutation_reader>>(
|
||||
*sstables->all()
|
||||
| boost::adaptors::transformed([&] (sstables::shared_sstable sst) {
|
||||
return reader_factory_fn(sst, pr);
|
||||
})
|
||||
);
|
||||
return make_combined_reader(s,
|
||||
std::move(sstable_readers),
|
||||
fwd,
|
||||
fwd_mr);
|
||||
}
|
||||
|
||||
@@ -297,6 +297,7 @@ public:
|
||||
bool enable_cache = true;
|
||||
bool enable_commitlog = true;
|
||||
bool enable_incremental_backups = false;
|
||||
bool compaction_enforce_min_threshold = false;
|
||||
::dirty_memory_manager* dirty_memory_manager = &default_dirty_memory_manager;
|
||||
::dirty_memory_manager* streaming_dirty_memory_manager = &default_dirty_memory_manager;
|
||||
reader_concurrency_semaphore* read_concurrency_semaphore;
|
||||
@@ -735,6 +736,10 @@ public:
|
||||
_config.enable_incremental_backups = val;
|
||||
}
|
||||
|
||||
bool compaction_enforce_min_threshold() const {
|
||||
return _config.compaction_enforce_min_threshold;
|
||||
}
|
||||
|
||||
const sstables::sstable_set& get_sstable_set() const;
|
||||
lw_shared_ptr<sstable_list> get_sstables() const;
|
||||
lw_shared_ptr<sstable_list> get_sstables_including_compacted_undeleted() const;
|
||||
@@ -979,6 +984,7 @@ public:
|
||||
bool enable_disk_writes = true;
|
||||
bool enable_cache = true;
|
||||
bool enable_incremental_backups = false;
|
||||
bool compaction_enforce_min_threshold = false;
|
||||
::dirty_memory_manager* dirty_memory_manager = &default_dirty_memory_manager;
|
||||
::dirty_memory_manager* streaming_dirty_memory_manager = &default_dirty_memory_manager;
|
||||
reader_concurrency_semaphore* read_concurrency_semaphore;
|
||||
|
||||
@@ -125,6 +125,9 @@ public:
|
||||
val(compaction_static_shares, float, 0, Used, \
|
||||
"If set to higher than 0, ignore the controller's output and set the compaction shares statically. Do not set this unless you know what you are doing and suspect a problem in the controller. This option will be retired when the controller reaches more maturity" \
|
||||
) \
|
||||
val(compaction_enforce_min_threshold, bool, false, Used, \
|
||||
"If set to true, enforce the min_threshold option for compactions strictly. If false (default), Scylla may decide to compact even if below min_threshold" \
|
||||
) \
|
||||
/* Initialization properties */ \
|
||||
/* The minimal properties needed for configuring a cluster. */ \
|
||||
val(cluster_name, sstring, "", Used, \
|
||||
|
||||
2
dist/ami/files/.bash_profile
vendored
2
dist/ami/files/.bash_profile
vendored
@@ -120,7 +120,7 @@ else
|
||||
fi
|
||||
fi
|
||||
echo -n " "
|
||||
/usr/lib/scylla/scylla_ec2_check
|
||||
/usr/lib/scylla/scylla_ec2_check --nic eth0
|
||||
if [ $? -eq 0 ]; then
|
||||
echo
|
||||
fi
|
||||
|
||||
30
dist/common/scripts/scylla_ec2_check
vendored
30
dist/common/scripts/scylla_ec2_check
vendored
@@ -2,6 +2,12 @@
|
||||
|
||||
. /usr/lib/scylla/scylla_lib.sh
|
||||
|
||||
print_usage() {
|
||||
echo "scylla_ec2_check --nic eth0"
|
||||
echo " --nic specify NIC"
|
||||
exit 1
|
||||
}
|
||||
|
||||
get_en_interface_type() {
|
||||
TYPE=`curl -s http://169.254.169.254/latest/meta-data/instance-type|cut -d . -f 1`
|
||||
SUBTYPE=`curl -s http://169.254.169.254/latest/meta-data/instance-type|cut -d . -f 2`
|
||||
@@ -18,7 +24,7 @@ get_en_interface_type() {
|
||||
}
|
||||
|
||||
is_vpc_enabled() {
|
||||
MAC=`cat /sys/class/net/eth0/address`
|
||||
MAC=`cat /sys/class/net/$1/address`
|
||||
VPC_AVAIL=`curl -s http://169.254.169.254/latest/meta-data/network/interfaces/macs/$MAC/|grep vpc-id`
|
||||
[ -n "$VPC_AVAIL" ]
|
||||
}
|
||||
@@ -27,9 +33,27 @@ if ! is_ec2; then
|
||||
exit 0
|
||||
fi
|
||||
|
||||
if [ $# -eq 0 ]; then
|
||||
print_usage
|
||||
fi
|
||||
while [ $# -gt 0 ]; do
|
||||
case "$1" in
|
||||
"--nic")
|
||||
verify_args $@
|
||||
NIC="$2"
|
||||
shift 2
|
||||
;;
|
||||
esac
|
||||
done
|
||||
|
||||
if ! is_valid_nic $NIC; then
|
||||
echo "NIC $NIC doesn't exist."
|
||||
exit 1
|
||||
fi
|
||||
|
||||
TYPE=`curl -s http://169.254.169.254/latest/meta-data/instance-type`
|
||||
EN=`get_en_interface_type`
|
||||
DRIVER=`ethtool -i eth0|awk '/^driver:/ {print $2}'`
|
||||
DRIVER=`ethtool -i $NIC|awk '/^driver:/ {print $2}'`
|
||||
if [ "$EN" = "" ]; then
|
||||
tput setaf 1
|
||||
tput bold
|
||||
@@ -39,7 +63,7 @@ if [ "$EN" = "" ]; then
|
||||
echo "More documentation available at: "
|
||||
echo "http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/enhanced-networking.html#enabling_enhanced_networking"
|
||||
exit 1
|
||||
elif ! is_vpc_enabled; then
|
||||
elif ! is_vpc_enabled $NIC; then
|
||||
tput setaf 1
|
||||
tput bold
|
||||
echo "VPC is not enabled!"
|
||||
|
||||
4
dist/common/scripts/scylla_lib.sh
vendored
4
dist/common/scripts/scylla_lib.sh
vendored
@@ -91,6 +91,10 @@ create_perftune_conf() {
|
||||
/usr/lib/scylla/perftune.py --tune net --nic "$nic" $mode --dump-options-file > /etc/scylla.d/perftune.yaml
|
||||
}
|
||||
|
||||
is_valid_nic() {
|
||||
[ -d /sys/class/net/$1 ]
|
||||
}
|
||||
|
||||
. /etc/os-release
|
||||
if is_debian_variant || is_gentoo_variant; then
|
||||
SYSCONFIG=/etc/default
|
||||
|
||||
57
dist/common/scripts/scylla_setup
vendored
57
dist/common/scripts/scylla_setup
vendored
@@ -39,6 +39,27 @@ print_usage() {
|
||||
exit 1
|
||||
}
|
||||
|
||||
interactive_choose_nic() {
|
||||
NICS=$(for i in /sys/class/net/*;do nic=`basename $i`; if [ "$nic" != "lo" ]; then echo $nic; fi; done)
|
||||
NR_NICS=`echo $NICS|wc -w`
|
||||
if [ $NR_NICS -eq 0 ]; then
|
||||
echo "NIC not found."
|
||||
exit 1
|
||||
elif [ $NR_NICS -eq 1 ]; then
|
||||
NIC=$NICS
|
||||
else
|
||||
echo "Please select NIC from following list: "
|
||||
while true; do
|
||||
echo $NICS
|
||||
echo -n "> "
|
||||
read NIC
|
||||
if is_valid_nic $NIC; then
|
||||
break
|
||||
fi
|
||||
done
|
||||
fi
|
||||
}
|
||||
|
||||
interactive_ask_service() {
|
||||
echo $1
|
||||
echo $2
|
||||
@@ -112,14 +133,20 @@ run_setup_script() {
|
||||
name=$1
|
||||
shift 1
|
||||
$* &&:
|
||||
if [ $? -ne 0 ] && [ $INTERACTIVE -eq 1 ]; then
|
||||
printf "${RED}$name setup failed. press any key to continue...${NO_COLOR}\n"
|
||||
read
|
||||
return 1
|
||||
if [ $? -ne 0 ]; then
|
||||
if [ $INTERACTIVE -eq 1 ]; then
|
||||
printf "${RED}$name setup failed. press any key to continue...${NO_COLOR}\n"
|
||||
read
|
||||
return 1
|
||||
else
|
||||
printf "$name setup failed.\n"
|
||||
exit 1
|
||||
fi
|
||||
fi
|
||||
return 0
|
||||
}
|
||||
|
||||
NIC="eth0"
|
||||
AMI=0
|
||||
SET_NIC=0
|
||||
DEV_MODE=0
|
||||
@@ -260,7 +287,8 @@ if is_ec2; then
|
||||
EC2_CHECK=$?
|
||||
fi
|
||||
if [ $EC2_CHECK -eq 1 ]; then
|
||||
/usr/lib/scylla/scylla_ec2_check
|
||||
interactive_choose_nic
|
||||
/usr/lib/scylla/scylla_ec2_check --nic $NIC
|
||||
fi
|
||||
fi
|
||||
|
||||
@@ -447,24 +475,6 @@ if [ $INTERACTIVE -eq 1 ]; then
|
||||
interactive_ask_service "Do you want to setup sysconfig?" "Answer yes to do system wide configuration customized for Scylla. Answer no to do nothing." "yes" &&:
|
||||
SYSCONFIG_SETUP=$?
|
||||
if [ $SYSCONFIG_SETUP -eq 1 ]; then
|
||||
NICS=$(for i in /sys/class/net/*;do nic=`basename $i`; if [ "$nic" != "lo" ]; then echo $nic; fi; done)
|
||||
NR_NICS=`echo $NICS|wc -w`
|
||||
if [ $NR_NICS -eq 0 ]; then
|
||||
echo "NIC not found."
|
||||
exit 1
|
||||
elif [ $NR_NICS -eq 1 ]; then
|
||||
NIC=$NICS
|
||||
else
|
||||
echo "Please select NIC from following list: "
|
||||
while true; do
|
||||
echo $NICS
|
||||
echo -n "> "
|
||||
read NIC
|
||||
if [ -e /sys/class/net/$NIC ]; then
|
||||
break
|
||||
fi
|
||||
done
|
||||
fi
|
||||
interactive_ask_service "Do you want to optimize NIC queue settings?" "Answer yes to enable network card optimization and improve performance. Answer no to skip this optimization." "yes" &&:
|
||||
SET_NIC=$?
|
||||
fi
|
||||
@@ -474,6 +484,7 @@ if [ $SYSCONFIG_SETUP -eq 1 ]; then
|
||||
if [ $SET_NIC -eq 1 ]; then
|
||||
SETUP_ARGS="--setup-nic"
|
||||
fi
|
||||
interactive_choose_nic
|
||||
run_setup_script "NIC queue" /usr/lib/scylla/scylla_sysconfig_setup --nic $NIC $SETUP_ARGS
|
||||
fi
|
||||
|
||||
|
||||
24
dist/debian/build_deb.sh
vendored
24
dist/debian/build_deb.sh
vendored
@@ -2,10 +2,11 @@
|
||||
|
||||
. /etc/os-release
|
||||
print_usage() {
|
||||
echo "build_deb.sh -target <codename> --dist --rebuild-dep"
|
||||
echo "build_deb.sh -target <codename> --dist --rebuild-dep --jobs 2"
|
||||
echo " --target target distribution codename"
|
||||
echo " --dist create a public distribution package"
|
||||
echo " --no-clean don't rebuild pbuilder tgz"
|
||||
echo " --jobs specify number of jobs"
|
||||
exit 1
|
||||
}
|
||||
install_deps() {
|
||||
@@ -19,6 +20,7 @@ install_deps() {
|
||||
DIST=0
|
||||
TARGET=
|
||||
NO_CLEAN=0
|
||||
JOBS=0
|
||||
while [ $# -gt 0 ]; do
|
||||
case "$1" in
|
||||
"--dist")
|
||||
@@ -33,6 +35,10 @@ while [ $# -gt 0 ]; do
|
||||
NO_CLEAN=1
|
||||
shift 1
|
||||
;;
|
||||
"--jobs")
|
||||
JOBS=$2
|
||||
shift 2
|
||||
;;
|
||||
*)
|
||||
print_usage
|
||||
;;
|
||||
@@ -248,16 +254,18 @@ if [ "$TARGET" != "trusty" ]; then
|
||||
cp dist/common/systemd/node-exporter.service debian/scylla-server.node-exporter.service
|
||||
fi
|
||||
|
||||
sudo cp ./dist/debian/pbuilderrc ~root/.pbuilderrc
|
||||
if [ $NO_CLEAN -eq 0 ]; then
|
||||
sudo rm -fv /var/cache/pbuilder/scylla-server-$TARGET.tgz
|
||||
sudo -H DIST=$TARGET /usr/sbin/pbuilder clean
|
||||
sudo -H DIST=$TARGET /usr/sbin/pbuilder create --allow-untrusted
|
||||
sudo DIST=$TARGET /usr/sbin/pbuilder clean --configfile ./dist/debian/pbuilderrc
|
||||
sudo DIST=$TARGET /usr/sbin/pbuilder create --configfile ./dist/debian/pbuilderrc --allow-untrusted
|
||||
fi
|
||||
sudo -H DIST=$TARGET /usr/sbin/pbuilder update --allow-untrusted
|
||||
if [ $JOBS -ne 0 ]; then
|
||||
DEB_BUILD_OPTIONS="parallel=$JOBS"
|
||||
fi
|
||||
sudo -H DIST=$TARGET /usr/sbin/pbuilder update --configfile ./dist/debian/pbuilderrc --allow-untrusted
|
||||
if [ "$TARGET" = "trusty" ] || [ "$TARGET" = "xenial" ] || [ "$TARGET" = "yakkety" ] || [ "$TARGET" = "zesty" ] || [ "$TARGET" = "artful" ] || [ "$TARGET" = "bionic" ]; then
|
||||
sudo -H DIST=$TARGET /usr/sbin/pbuilder execute --save-after-exec dist/debian/ubuntu_enable_ppa.sh
|
||||
sudo DIST=$TARGET /usr/sbin/pbuilder execute --configfile ./dist/debian/pbuilderrc --save-after-exec dist/debian/ubuntu_enable_ppa.sh
|
||||
elif [ "$TARGET" = "jessie" ] || [ "$TARGET" = "stretch" ]; then
|
||||
sudo -H DIST=$TARGET /usr/sbin/pbuilder execute --save-after-exec dist/debian/debian_install_gpgkey.sh
|
||||
sudo DIST=$TARGET /usr/sbin/pbuilder execute --configfile ./dist/debian/pbuilderrc --save-after-exec dist/debian/debian_install_gpgkey.sh
|
||||
fi
|
||||
sudo -H DIST=$TARGET pdebuild --buildresult build/debs
|
||||
sudo -H DIST=$TARGET DEB_BUILD_OPTIONS=$DEB_BUILD_OPTIONS pdebuild --configfile ./dist/debian/pbuilderrc --buildresult build/debs
|
||||
|
||||
3
dist/debian/rules.in
vendored
3
dist/debian/rules.in
vendored
@@ -1,12 +1,13 @@
|
||||
#!/usr/bin/make -f
|
||||
|
||||
export PYBUILD_DISABLE=1
|
||||
jobs := $(shell echo $$DEB_BUILD_OPTIONS | sed -r "s/.*parallel=([0-9]+).*/-j\1/")
|
||||
|
||||
override_dh_auto_configure:
|
||||
./configure.py --with=scylla --with=iotune --enable-dpdk --mode=release --static-thrift --static-boost --static-yaml-cpp --compiler=@@COMPILER@@ --cflags="-I/opt/scylladb/include -L/opt/scylladb/lib/x86-linux-gnu/" --ldflags="-Wl,-rpath=/opt/scylladb/lib"
|
||||
|
||||
override_dh_auto_build:
|
||||
PATH="/opt/scylladb/bin:$$PATH" ninja
|
||||
PATH="/opt/scylladb/bin:$$PATH" ninja $(jobs)
|
||||
|
||||
override_dh_auto_clean:
|
||||
rm -rf build/release seastar/build
|
||||
|
||||
@@ -183,10 +183,7 @@ flat_mutation_reader make_delegating_reader(flat_mutation_reader& r) {
|
||||
flat_mutation_reader make_forwardable(flat_mutation_reader m) {
|
||||
class reader : public flat_mutation_reader::impl {
|
||||
flat_mutation_reader _underlying;
|
||||
position_range _current = {
|
||||
position_in_partition(position_in_partition::partition_start_tag_t()),
|
||||
position_in_partition(position_in_partition::after_static_row_tag_t())
|
||||
};
|
||||
position_range _current;
|
||||
mutation_fragment_opt _next;
|
||||
// When resolves, _next is engaged or _end_of_stream is set.
|
||||
future<> ensure_next() {
|
||||
@@ -201,7 +198,10 @@ flat_mutation_reader make_forwardable(flat_mutation_reader m) {
|
||||
});
|
||||
}
|
||||
public:
|
||||
reader(flat_mutation_reader r) : impl(r.schema()), _underlying(std::move(r)) { }
|
||||
reader(flat_mutation_reader r) : impl(r.schema()), _underlying(std::move(r)), _current({
|
||||
position_in_partition(position_in_partition::partition_start_tag_t()),
|
||||
position_in_partition(position_in_partition::after_static_row_tag_t())
|
||||
}) { }
|
||||
virtual future<> fill_buffer(db::timeout_clock::time_point timeout) override {
|
||||
return repeat([this] {
|
||||
if (is_buffer_full()) {
|
||||
|
||||
@@ -1005,7 +1005,7 @@ future<> gossiper::assassinate_endpoint(sstring address) {
|
||||
logger.warn("Assassinating {} via gossip", endpoint);
|
||||
if (es) {
|
||||
auto& ss = service::get_local_storage_service();
|
||||
auto tokens = ss.get_token_metadata().get_tokens(endpoint);
|
||||
tokens = ss.get_token_metadata().get_tokens(endpoint);
|
||||
if (tokens.empty()) {
|
||||
logger.warn("Unable to calculate tokens for {}. Will use a random one", address);
|
||||
throw std::runtime_error(sprint("Unable to calculate tokens for %s", endpoint));
|
||||
|
||||
4
keys.hh
4
keys.hh
@@ -721,6 +721,10 @@ public:
|
||||
static const compound& get_compound_type(const schema& s) {
|
||||
return s.clustering_key_prefix_type();
|
||||
}
|
||||
|
||||
static clustering_key_prefix_view make_empty() {
|
||||
return { bytes_view() };
|
||||
}
|
||||
};
|
||||
|
||||
class clustering_key_prefix : public prefix_compound_wrapper<clustering_key_prefix, clustering_key_prefix_view, clustering_key> {
|
||||
|
||||
@@ -119,9 +119,17 @@ insert_token_range_to_sorted_container_while_unwrapping(
|
||||
const dht::token& tok,
|
||||
dht::token_range_vector& ret) {
|
||||
if (prev_tok < tok) {
|
||||
ret.emplace_back(
|
||||
dht::token_range::bound(prev_tok, false),
|
||||
dht::token_range::bound(tok, true));
|
||||
auto pos = ret.end();
|
||||
if (!ret.empty() && !std::prev(pos)->end()) {
|
||||
// We inserted a wrapped range (a, b] previously as
|
||||
// (-inf, b], (a, +inf). So now we insert in the next-to-last
|
||||
// position to keep the last range (a, +inf) at the end.
|
||||
pos = std::prev(pos);
|
||||
}
|
||||
ret.insert(pos,
|
||||
dht::token_range{
|
||||
dht::token_range::bound(prev_tok, false),
|
||||
dht::token_range::bound(tok, true)});
|
||||
} else {
|
||||
ret.emplace_back(
|
||||
dht::token_range::bound(prev_tok, false),
|
||||
|
||||
@@ -100,7 +100,6 @@ future<> ec2_multi_region_snitch::gossiper_starting() {
|
||||
// Note: currently gossiper "main" instance always runs on CPU0 therefore
|
||||
// this function will be executed on CPU0 only.
|
||||
//
|
||||
ec2_snitch::gossiper_starting();
|
||||
|
||||
using namespace gms;
|
||||
auto& g = get_local_gossiper();
|
||||
|
||||
@@ -1089,7 +1089,7 @@ row::apply_monotonically(const column_definition& column, atomic_cell_or_collect
|
||||
if (_type == storage_type::vector && id < max_vector_size) {
|
||||
if (id >= _storage.vector.v.size()) {
|
||||
_storage.vector.v.resize(id);
|
||||
_storage.vector.v.emplace_back(cell_and_hash{std::move(value), std::move(hash)});
|
||||
_storage.vector.v.emplace_back(std::move(value), std::move(hash));
|
||||
_storage.vector.present.set(id);
|
||||
_size++;
|
||||
} else if (auto& cell_and_hash = _storage.vector.v[id]; !bool(cell_and_hash.cell)) {
|
||||
@@ -1753,9 +1753,10 @@ void mutation_querier::query_static_row(const row& r, tombstone current_tombston
|
||||
} else if (_short_reads_allowed) {
|
||||
seastar::measuring_output_stream stream;
|
||||
ser::qr_partition__static_row__cells<seastar::measuring_output_stream> out(stream, { });
|
||||
auto start = stream.size();
|
||||
get_compacted_row_slice(_schema, slice, column_kind::static_column,
|
||||
r, slice.static_columns, _static_cells_wr);
|
||||
_memory_accounter.update(stream.size());
|
||||
r, slice.static_columns, out);
|
||||
_memory_accounter.update(stream.size() - start);
|
||||
}
|
||||
if (_pw.requested_digest()) {
|
||||
max_timestamp max_ts{_pw.last_modified()};
|
||||
@@ -1816,8 +1817,9 @@ stop_iteration mutation_querier::consume(clustering_row&& cr, row_tombstone curr
|
||||
} else if (_short_reads_allowed) {
|
||||
seastar::measuring_output_stream stream;
|
||||
ser::qr_partition__rows<seastar::measuring_output_stream> out(stream, { });
|
||||
auto start = stream.size();
|
||||
write_row(out);
|
||||
stop = _memory_accounter.update_and_check(stream.size());
|
||||
stop = _memory_accounter.update_and_check(stream.size() - start);
|
||||
}
|
||||
|
||||
_live_clustering_rows++;
|
||||
|
||||
@@ -74,6 +74,17 @@ using cell_hash_opt = seastar::optimized_optional<cell_hash>;
|
||||
struct cell_and_hash {
|
||||
atomic_cell_or_collection cell;
|
||||
mutable cell_hash_opt hash;
|
||||
|
||||
cell_and_hash() = default;
|
||||
cell_and_hash(cell_and_hash&&) noexcept = default;
|
||||
cell_and_hash& operator=(cell_and_hash&&) noexcept = default;
|
||||
cell_and_hash(const cell_and_hash&) = default;
|
||||
cell_and_hash& operator=(const cell_and_hash&) = default;
|
||||
|
||||
cell_and_hash(atomic_cell_or_collection&& cell, cell_hash_opt hash)
|
||||
: cell(std::move(cell))
|
||||
, hash(hash)
|
||||
{ }
|
||||
};
|
||||
|
||||
//
|
||||
|
||||
@@ -273,6 +273,11 @@ public:
|
||||
return is_partition_end() || (_ck && _ck->is_empty(s) && _bound_weight > 0);
|
||||
}
|
||||
|
||||
bool is_before_all_clustered_rows(const schema& s) const {
|
||||
return _type < partition_region::clustered
|
||||
|| (_type == partition_region::clustered && _ck->is_empty(s) && _bound_weight < 0);
|
||||
}
|
||||
|
||||
template<typename Hasher>
|
||||
void feed_hash(Hasher& hasher, const schema& s) const {
|
||||
::feed_hash(hasher, _bound_weight);
|
||||
|
||||
81
querier.cc
81
querier.cc
@@ -152,34 +152,33 @@ const size_t querier_cache::max_queriers_memory_usage = memory::stats().total_me
|
||||
void querier_cache::scan_cache_entries() {
|
||||
const auto now = lowres_clock::now();
|
||||
|
||||
auto it = _meta_entries.begin();
|
||||
const auto end = _meta_entries.end();
|
||||
auto it = _entries.begin();
|
||||
const auto end = _entries.end();
|
||||
while (it != end && it->is_expired(now)) {
|
||||
if (*it) {
|
||||
++_stats.time_based_evictions;
|
||||
}
|
||||
it = _meta_entries.erase(it);
|
||||
_stats.population = _entries.size();
|
||||
++_stats.time_based_evictions;
|
||||
--_stats.population;
|
||||
it = _entries.erase(it);
|
||||
}
|
||||
}
|
||||
|
||||
querier_cache::entries::iterator querier_cache::find_querier(utils::UUID key, const dht::partition_range& range, tracing::trace_state_ptr trace_state) {
|
||||
const auto queriers = _entries.equal_range(key);
|
||||
const auto queriers = _index.equal_range(key);
|
||||
|
||||
if (queriers.first == _entries.end()) {
|
||||
if (queriers.first == _index.end()) {
|
||||
tracing::trace(trace_state, "Found no cached querier for key {}", key);
|
||||
return _entries.end();
|
||||
}
|
||||
|
||||
const auto it = std::find_if(queriers.first, queriers.second, [&] (const std::pair<const utils::UUID, entry>& elem) {
|
||||
return elem.second.get().matches(range);
|
||||
const auto it = std::find_if(queriers.first, queriers.second, [&] (const entry& e) {
|
||||
return e.value().matches(range);
|
||||
});
|
||||
|
||||
if (it == queriers.second) {
|
||||
tracing::trace(trace_state, "Found cached querier(s) for key {} but none matches the query range {}", key, range);
|
||||
return _entries.end();
|
||||
}
|
||||
tracing::trace(trace_state, "Found cached querier for key {} and range {}", key, range);
|
||||
return it;
|
||||
return it->pos();
|
||||
}
|
||||
|
||||
querier_cache::querier_cache(std::chrono::seconds entry_ttl)
|
||||
@@ -199,8 +198,7 @@ void querier_cache::insert(utils::UUID key, querier&& q, tracing::trace_state_pt
|
||||
|
||||
tracing::trace(trace_state, "Caching querier with key {}", key);
|
||||
|
||||
auto memory_usage = boost::accumulate(
|
||||
_entries | boost::adaptors::map_values | boost::adaptors::transformed(std::mem_fn(&querier_cache::entry::memory_usage)), size_t(0));
|
||||
auto memory_usage = boost::accumulate(_entries | boost::adaptors::transformed(std::mem_fn(&entry::memory_usage)), size_t(0));
|
||||
|
||||
// We add the memory-usage of the to-be added querier to the memory-usage
|
||||
// of all the cached queriers. We now need to makes sure this number is
|
||||
@@ -210,20 +208,20 @@ void querier_cache::insert(utils::UUID key, querier&& q, tracing::trace_state_pt
|
||||
memory_usage += q.memory_usage();
|
||||
|
||||
if (memory_usage >= max_queriers_memory_usage) {
|
||||
auto it = _meta_entries.begin();
|
||||
const auto end = _meta_entries.end();
|
||||
auto it = _entries.begin();
|
||||
const auto end = _entries.end();
|
||||
while (it != end && memory_usage >= max_queriers_memory_usage) {
|
||||
if (*it) {
|
||||
++_stats.memory_based_evictions;
|
||||
memory_usage -= it->get_entry().memory_usage();
|
||||
}
|
||||
it = _meta_entries.erase(it);
|
||||
++_stats.memory_based_evictions;
|
||||
memory_usage -= it->memory_usage();
|
||||
--_stats.population;
|
||||
it = _entries.erase(it);
|
||||
}
|
||||
}
|
||||
|
||||
const auto it = _entries.emplace(key, entry::param{std::move(q), _entry_ttl}).first;
|
||||
_meta_entries.emplace_back(_entries, it);
|
||||
_stats.population = _entries.size();
|
||||
auto& e = _entries.emplace_back(key, std::move(q), lowres_clock::now() + _entry_ttl);
|
||||
e.set_pos(--_entries.end());
|
||||
_index.insert(e);
|
||||
++_stats.population;
|
||||
}
|
||||
|
||||
querier querier_cache::lookup(utils::UUID key,
|
||||
@@ -240,9 +238,9 @@ querier querier_cache::lookup(utils::UUID key,
|
||||
return create_fun();
|
||||
}
|
||||
|
||||
auto q = std::move(it->second).get();
|
||||
auto q = std::move(*it).value();
|
||||
_entries.erase(it);
|
||||
_stats.population = _entries.size();
|
||||
--_stats.population;
|
||||
|
||||
const auto can_be_used = q.can_be_used_for_page(only_live, s, range, slice);
|
||||
if (can_be_used == querier::can_use::yes) {
|
||||
@@ -265,25 +263,24 @@ bool querier_cache::evict_one() {
|
||||
return false;
|
||||
}
|
||||
|
||||
auto it = _meta_entries.begin();
|
||||
const auto end = _meta_entries.end();
|
||||
while (it != end) {
|
||||
const auto is_live = bool(*it);
|
||||
it = _meta_entries.erase(it);
|
||||
_stats.population = _entries.size();
|
||||
if (is_live) {
|
||||
++_stats.resource_based_evictions;
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
++_stats.resource_based_evictions;
|
||||
--_stats.population;
|
||||
_entries.pop_front();
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
void querier_cache::evict_all_for_table(const utils::UUID& schema_id) {
|
||||
_meta_entries.remove_if([&] (const meta_entry& me) {
|
||||
return !me || me.get_entry().get().schema()->id() == schema_id;
|
||||
});
|
||||
_stats.population = _entries.size();
|
||||
auto it = _entries.begin();
|
||||
const auto end = _entries.end();
|
||||
while (it != end) {
|
||||
if (it->schema().id() == schema_id) {
|
||||
--_stats.population;
|
||||
it = _entries.erase(it);
|
||||
} else {
|
||||
++it;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
querier_cache_context::querier_cache_context(querier_cache& cache, utils::UUID key, bool is_first_page)
|
||||
|
||||
103
querier.hh
103
querier.hh
@@ -24,7 +24,8 @@
|
||||
#include "mutation_compactor.hh"
|
||||
#include "mutation_reader.hh"
|
||||
|
||||
#include <seastar/core/weak_ptr.hh>
|
||||
#include <boost/intrusive/set.hpp>
|
||||
|
||||
#include <variant>
|
||||
|
||||
/// One-stop object for serving queries.
|
||||
@@ -264,75 +265,65 @@ public:
|
||||
};
|
||||
|
||||
private:
|
||||
class entry : public weakly_referencable<entry> {
|
||||
querier _querier;
|
||||
lowres_clock::time_point _expires;
|
||||
public:
|
||||
// Since entry cannot be moved and unordered_map::emplace can pass only
|
||||
// a single param to it's mapped-type we need to force a single-param
|
||||
// constructor for entry. Oh C++...
|
||||
struct param {
|
||||
querier q;
|
||||
std::chrono::seconds ttl;
|
||||
};
|
||||
class entry : public boost::intrusive::set_base_hook<boost::intrusive::link_mode<boost::intrusive::auto_unlink>> {
|
||||
// Self reference so that we can remove the entry given an `entry&`.
|
||||
std::list<entry>::iterator _pos;
|
||||
const utils::UUID _key;
|
||||
const lowres_clock::time_point _expires;
|
||||
querier _value;
|
||||
|
||||
explicit entry(param p)
|
||||
: _querier(std::move(p.q))
|
||||
, _expires(lowres_clock::now() + p.ttl) {
|
||||
public:
|
||||
entry(utils::UUID key, querier q, lowres_clock::time_point expires)
|
||||
: _key(key)
|
||||
, _expires(expires)
|
||||
, _value(std::move(q)) {
|
||||
}
|
||||
|
||||
std::list<entry>::iterator pos() const {
|
||||
return _pos;
|
||||
}
|
||||
|
||||
void set_pos(std::list<entry>::iterator pos) {
|
||||
_pos = pos;
|
||||
}
|
||||
|
||||
const utils::UUID& key() const {
|
||||
return _key;
|
||||
}
|
||||
|
||||
const ::schema& schema() const {
|
||||
return *_value.schema();
|
||||
}
|
||||
|
||||
bool is_expired(const lowres_clock::time_point& now) const {
|
||||
return _expires <= now;
|
||||
}
|
||||
|
||||
const querier& get() const & {
|
||||
return _querier;
|
||||
}
|
||||
|
||||
querier&& get() && {
|
||||
return std::move(_querier);
|
||||
}
|
||||
|
||||
size_t memory_usage() const {
|
||||
return _querier.memory_usage();
|
||||
return _value.memory_usage();
|
||||
}
|
||||
|
||||
const querier& value() const & {
|
||||
return _value;
|
||||
}
|
||||
|
||||
querier value() && {
|
||||
return std::move(_value);
|
||||
}
|
||||
};
|
||||
|
||||
using entries = std::unordered_map<utils::UUID, entry>;
|
||||
|
||||
class meta_entry {
|
||||
entries& _entries;
|
||||
weak_ptr<entry> _entry_ptr;
|
||||
entries::iterator _entry_it;
|
||||
|
||||
public:
|
||||
meta_entry(entries& e, entries::iterator it)
|
||||
: _entries(e)
|
||||
, _entry_ptr(it->second.weak_from_this())
|
||||
, _entry_it(it) {
|
||||
}
|
||||
|
||||
~meta_entry() {
|
||||
if (_entry_ptr) {
|
||||
_entries.erase(_entry_it);
|
||||
}
|
||||
}
|
||||
|
||||
bool is_expired(const lowres_clock::time_point& now) const {
|
||||
return !_entry_ptr || _entry_ptr->is_expired(now);
|
||||
}
|
||||
|
||||
explicit operator bool() const {
|
||||
return bool(_entry_ptr);
|
||||
}
|
||||
|
||||
const entry& get_entry() const {
|
||||
return *_entry_ptr;
|
||||
}
|
||||
struct key_of_entry {
|
||||
using type = utils::UUID;
|
||||
const type& operator()(const entry& e) { return e.key(); }
|
||||
};
|
||||
|
||||
using entries = std::list<entry>;
|
||||
using index = boost::intrusive::multiset<entry, boost::intrusive::key_of_value<key_of_entry>,
|
||||
boost::intrusive::constant_time_size<false>>;
|
||||
|
||||
private:
|
||||
entries _entries;
|
||||
std::list<meta_entry> _meta_entries;
|
||||
index _index;
|
||||
timer<lowres_clock> _expiry_timer;
|
||||
std::chrono::seconds _entry_ttl;
|
||||
stats _stats;
|
||||
|
||||
2
seastar
2
seastar
Submodule seastar updated: 6f61d7456e...88cb58cfbf
@@ -144,7 +144,11 @@ future<lowres_clock::duration> cache_hitrate_calculator::recalculate_hitrates()
|
||||
return _db.invoke_on_all([this, rates = std::move(rates), cpuid = engine().cpu_id()] (database& db) {
|
||||
sstring gstate;
|
||||
for (auto& cf : db.get_column_families() | boost::adaptors::filtered(non_system_filter)) {
|
||||
stat s = rates.at(cf.first);
|
||||
auto it = rates.find(cf.first);
|
||||
if (it == rates.end()) { // a table may be added before map/reduce compltes and this code runs
|
||||
continue;
|
||||
}
|
||||
stat s = it->second;
|
||||
float rate = 0;
|
||||
if (s.h) {
|
||||
rate = s.h / (s.h + s.m);
|
||||
|
||||
@@ -83,7 +83,7 @@ private:
|
||||
_last_replicas = state->get_last_replicas();
|
||||
} else {
|
||||
// Reusing readers is currently only supported for singular queries.
|
||||
if (_ranges.front().is_singular()) {
|
||||
if (!_ranges.empty() && query::is_single_partition(_ranges.front())) {
|
||||
_cmd->query_uuid = utils::make_random_uuid();
|
||||
}
|
||||
_cmd->is_first_page = true;
|
||||
|
||||
@@ -3220,9 +3220,22 @@ storage_proxy::query_partition_key_range(lw_shared_ptr<query::read_command> cmd,
|
||||
slogger.debug("Estimated result rows per range: {}; requested rows: {}, ranges.size(): {}; concurrent range requests: {}",
|
||||
result_rows_per_range, cmd->row_limit, ranges.size(), concurrency_factor);
|
||||
|
||||
// The call to `query_partition_key_range_concurrent()` below
|
||||
// updates `cmd` directly when processing the results. Under
|
||||
// some circumstances, when the query executes without deferring,
|
||||
// this updating will happen before the lambda object is constructed
|
||||
// and hence the updates will be visible to the lambda. This will
|
||||
// result in the merger below trimming the results according to the
|
||||
// updated (decremented) limits and causing the paging logic to
|
||||
// declare the query exhausted due to the non-full page. To avoid
|
||||
// this save the original values of the limits here and pass these
|
||||
// to the lambda below.
|
||||
const auto row_limit = cmd->row_limit;
|
||||
const auto partition_limit = cmd->partition_limit;
|
||||
|
||||
return query_partition_key_range_concurrent(timeout, std::move(results), cmd, cl, ranges.begin(), std::move(ranges), concurrency_factor,
|
||||
std::move(trace_state), cmd->row_limit, cmd->partition_limit)
|
||||
.then([row_limit = cmd->row_limit, partition_limit = cmd->partition_limit](std::vector<foreign_ptr<lw_shared_ptr<query::result>>> results) {
|
||||
.then([row_limit, partition_limit](std::vector<foreign_ptr<lw_shared_ptr<query::result>>> results) {
|
||||
query::result_merger merger(row_limit, partition_limit);
|
||||
merger.reserve(results.size());
|
||||
|
||||
@@ -3585,6 +3598,7 @@ future<> storage_proxy::truncate_blocking(sstring keyspace, sstring cfname) {
|
||||
std::rethrow_exception(ep);
|
||||
} catch (rpc::timeout_error& e) {
|
||||
slogger.trace("Truncation of {} timed out: {}", cfname, e.what());
|
||||
throw;
|
||||
} catch (...) {
|
||||
throw;
|
||||
}
|
||||
|
||||
@@ -33,6 +33,7 @@
|
||||
#include "unimplemented.hh"
|
||||
#include "stdx.hh"
|
||||
#include "segmented_compress_params.hh"
|
||||
#include "utils/class_registrator.hh"
|
||||
|
||||
namespace sstables {
|
||||
|
||||
@@ -299,7 +300,8 @@ size_t local_compression::compress_max_size(size_t input_len) const {
|
||||
|
||||
void compression::set_compressor(compressor_ptr c) {
|
||||
if (c) {
|
||||
auto& cn = c->name();
|
||||
unqualified_name uqn(compressor::namespace_prefix, c->name());
|
||||
const sstring& cn = uqn;
|
||||
name.value = bytes(cn.begin(), cn.end());
|
||||
for (auto& p : c->options()) {
|
||||
if (p.first != compression_parameters::SSTABLE_COMPRESSION) {
|
||||
|
||||
@@ -294,6 +294,12 @@ size_tiered_compaction_strategy::get_sstables_for_compaction(column_family& cfs,
|
||||
return sstables::compaction_descriptor(std::move(most_interesting));
|
||||
}
|
||||
|
||||
// If we are not enforcing min_threshold explicitly, try any pair of SStables in the same tier.
|
||||
if (!cfs.compaction_enforce_min_threshold() && is_any_bucket_interesting(buckets, 2)) {
|
||||
std::vector<sstables::shared_sstable> most_interesting = most_interesting_bucket(std::move(buckets), 2, max_threshold);
|
||||
return sstables::compaction_descriptor(std::move(most_interesting));
|
||||
}
|
||||
|
||||
// if there is no sstable to compact in standard way, try compacting single sstable whose droppable tombstone
|
||||
// ratio is greater than threshold.
|
||||
// prefer oldest sstables from biggest size tiers because they will be easier to satisfy conditions for
|
||||
|
||||
@@ -215,3 +215,22 @@ SEASTAR_TEST_CASE(test_aggregate_count) {
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_reverse_type_aggregation) {
|
||||
return do_with_cql_env_thread([&] (auto& e) {
|
||||
e.execute_cql("CREATE TABLE test(p int, c timestamp, v int, primary key (p, c)) with clustering order by (c desc)").get();
|
||||
e.execute_cql("INSERT INTO test(p, c, v) VALUES (1, 1, 1)").get();
|
||||
e.execute_cql("INSERT INTO test(p, c, v) VALUES (1, 2, 1)").get();
|
||||
|
||||
{
|
||||
auto tp = db_clock::from_time_t({ 0 }) + std::chrono::milliseconds(1);
|
||||
auto msg = e.execute_cql("SELECT min(c) FROM test").get0();
|
||||
assert_that(msg).is_rows().with_size(1).with_row({{timestamp_type->decompose(tp)}});
|
||||
}
|
||||
{
|
||||
auto tp = db_clock::from_time_t({ 0 }) + std::chrono::milliseconds(2);
|
||||
auto msg = e.execute_cql("SELECT max(c) FROM test").get0();
|
||||
assert_that(msg).is_rows().with_size(1).with_row({{timestamp_type->decompose(tp)}});
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@@ -2047,10 +2047,9 @@ SEASTAR_TEST_CASE(test_in_restriction) {
|
||||
assert_that(msg).is_rows().with_size(0);
|
||||
return e.execute_cql("select r1 from tir where p1 in (2, 0, 2, 1);");
|
||||
}).then([&e] (shared_ptr<cql_transport::messages::result_message> msg) {
|
||||
assert_that(msg).is_rows().with_rows({
|
||||
assert_that(msg).is_rows().with_rows_ignore_order({
|
||||
{int32_type->decompose(4)},
|
||||
{int32_type->decompose(0)},
|
||||
{int32_type->decompose(4)},
|
||||
{int32_type->decompose(1)},
|
||||
{int32_type->decompose(2)},
|
||||
{int32_type->decompose(3)},
|
||||
@@ -2072,6 +2071,22 @@ SEASTAR_TEST_CASE(test_in_restriction) {
|
||||
{int32_type->decompose(2)},
|
||||
{int32_type->decompose(1)},
|
||||
});
|
||||
return e.prepare("select r1 from tir where p1 in ?");
|
||||
}).then([&e] (cql3::prepared_cache_key_type prepared_id){
|
||||
auto my_list_type = list_type_impl::get_instance(int32_type, true);
|
||||
std::vector<cql3::raw_value> raw_values;
|
||||
auto in_values_list = my_list_type->decompose(make_list_value(my_list_type,
|
||||
list_type_impl::native_type{{int(2), int(0), int(2), int(1)}}));
|
||||
raw_values.emplace_back(cql3::raw_value::make_value(in_values_list));
|
||||
return e.execute_prepared(prepared_id,raw_values);
|
||||
}).then([&e] (shared_ptr<cql_transport::messages::result_message> msg) {
|
||||
assert_that(msg).is_rows().with_rows_ignore_order({
|
||||
{int32_type->decompose(4)},
|
||||
{int32_type->decompose(0)},
|
||||
{int32_type->decompose(1)},
|
||||
{int32_type->decompose(2)},
|
||||
{int32_type->decompose(3)},
|
||||
});
|
||||
});
|
||||
});
|
||||
}
|
||||
@@ -2607,3 +2622,81 @@ SEASTAR_TEST_CASE(test_insert_large_collection_values) {
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
// Corner-case test that checks for the paging code's preparedness for an empty
|
||||
// range list.
|
||||
SEASTAR_TEST_CASE(test_empty_partition_range_scan) {
|
||||
return do_with_cql_env_thread([] (cql_test_env& e) {
|
||||
e.execute_cql("create keyspace empty_partition_range_scan with replication = {'class': 'SimpleStrategy', 'replication_factor': 1};").get();
|
||||
e.execute_cql("create table empty_partition_range_scan.tb (a int, b int, c int, val int, PRIMARY KEY ((a,b),c) );").get();
|
||||
|
||||
|
||||
auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
|
||||
cql3::query_options::specific_options{1, nullptr, {}, api::new_timestamp()});
|
||||
auto res = e.execute_cql("select * from empty_partition_range_scan.tb where token (a,b) > 1 and token(a,b) <= 1;", std::move(qo)).get0();
|
||||
assert_that(res).is_rows().is_empty();
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_static_multi_cell_static_lists_with_ckey) {
|
||||
return do_with_cql_env_thread([] (cql_test_env& e) {
|
||||
e.execute_cql("CREATE TABLE t (p int, c int, slist list<int> static, v int, PRIMARY KEY (p, c));").get();
|
||||
e.execute_cql("INSERT INTO t (p, c, slist, v) VALUES (1, 1, [1], 1); ").get();
|
||||
|
||||
{
|
||||
e.execute_cql("UPDATE t SET slist[0] = 3, v = 3 WHERE p = 1 AND c = 1;").get();
|
||||
auto msg = e.execute_cql("SELECT slist, v FROM t WHERE p = 1 AND c = 1;").get0();
|
||||
auto slist_type = list_type_impl::get_instance(int32_type, true);
|
||||
assert_that(msg).is_rows().with_row({
|
||||
{ slist_type->decompose(make_list_value(slist_type, list_type_impl::native_type({{3}}))) },
|
||||
{ int32_type->decompose(3) }
|
||||
});
|
||||
}
|
||||
{
|
||||
e.execute_cql("UPDATE t SET slist = [4], v = 4 WHERE p = 1 AND c = 1;").get();
|
||||
auto msg = e.execute_cql("SELECT slist, v FROM t WHERE p = 1 AND c = 1;").get0();
|
||||
auto slist_type = list_type_impl::get_instance(int32_type, true);
|
||||
assert_that(msg).is_rows().with_row({
|
||||
{ slist_type->decompose(make_list_value(slist_type, list_type_impl::native_type({{4}}))) },
|
||||
{ int32_type->decompose(4) }
|
||||
});
|
||||
}
|
||||
{
|
||||
e.execute_cql("UPDATE t SET slist = [3] + slist , v = 5 WHERE p = 1 AND c = 1;").get();
|
||||
auto msg = e.execute_cql("SELECT slist, v FROM t WHERE p = 1 AND c = 1;").get0();
|
||||
auto slist_type = list_type_impl::get_instance(int32_type, true);
|
||||
assert_that(msg).is_rows().with_row({
|
||||
{ slist_type->decompose(make_list_value(slist_type, list_type_impl::native_type({3, 4}))) },
|
||||
{ int32_type->decompose(5) }
|
||||
});
|
||||
}
|
||||
{
|
||||
e.execute_cql("UPDATE t SET slist = slist + [5] , v = 6 WHERE p = 1 AND c = 1;").get();
|
||||
auto msg = e.execute_cql("SELECT slist, v FROM t WHERE p = 1 AND c = 1;").get0();
|
||||
auto slist_type = list_type_impl::get_instance(int32_type, true);
|
||||
assert_that(msg).is_rows().with_row({
|
||||
{ slist_type->decompose(make_list_value(slist_type, list_type_impl::native_type({3, 4, 5}))) },
|
||||
{ int32_type->decompose(6) }
|
||||
});
|
||||
}
|
||||
{
|
||||
e.execute_cql("DELETE slist[2] from t WHERE p = 1;").get();
|
||||
auto msg = e.execute_cql("SELECT slist, v FROM t WHERE p = 1 AND c = 1;").get0();
|
||||
auto slist_type = list_type_impl::get_instance(int32_type, true);
|
||||
assert_that(msg).is_rows().with_row({
|
||||
{ slist_type->decompose(make_list_value(slist_type, list_type_impl::native_type({3, 4}))) },
|
||||
{ int32_type->decompose(6) }
|
||||
});
|
||||
}
|
||||
{
|
||||
e.execute_cql("UPDATE t SET slist = slist - [4] , v = 7 WHERE p = 1 AND c = 1;").get();
|
||||
auto msg = e.execute_cql("SELECT slist, v FROM t WHERE p = 1 AND c = 1;").get0();
|
||||
auto slist_type = list_type_impl::get_instance(int32_type, true);
|
||||
assert_that(msg).is_rows().with_row({
|
||||
{ slist_type->decompose(make_list_value(slist_type, list_type_impl::native_type({3}))) },
|
||||
{ int32_type->decompose(7) }
|
||||
});
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -29,6 +29,9 @@
|
||||
#include "database.hh"
|
||||
#include "partition_slice_builder.hh"
|
||||
#include "frozen_mutation.hh"
|
||||
#include "mutation_source_test.hh"
|
||||
#include "schema_registry.hh"
|
||||
#include "service/migration_manager.hh"
|
||||
|
||||
SEASTAR_TEST_CASE(test_querying_with_limits) {
|
||||
return do_with_cql_env([](cql_test_env& e) {
|
||||
@@ -74,3 +77,33 @@ SEASTAR_TEST_CASE(test_querying_with_limits) {
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_database_with_data_in_sstables_is_a_mutation_source) {
|
||||
do_with_cql_env([] (cql_test_env& e) {
|
||||
run_mutation_source_tests([&] (schema_ptr s, const std::vector<mutation>& partitions) -> mutation_source {
|
||||
try {
|
||||
e.local_db().find_column_family(s->ks_name(), s->cf_name());
|
||||
service::get_local_migration_manager().announce_column_family_drop(s->ks_name(), s->cf_name(), true).get();
|
||||
} catch (const no_such_column_family&) {
|
||||
// expected
|
||||
}
|
||||
service::get_local_migration_manager().announce_new_column_family(s, true).get();
|
||||
column_family& cf = e.local_db().find_column_family(s);
|
||||
for (auto&& m : partitions) {
|
||||
e.local_db().apply(cf.schema(), freeze(m)).get();
|
||||
}
|
||||
cf.flush().get();
|
||||
cf.get_row_cache().invalidate([] {}).get();
|
||||
return mutation_source([&] (schema_ptr s,
|
||||
const dht::partition_range& range,
|
||||
const query::partition_slice& slice,
|
||||
const io_priority_class& pc,
|
||||
tracing::trace_state_ptr trace_state,
|
||||
streamed_mutation::forwarding fwd,
|
||||
mutation_reader::forwarding fwd_mr) {
|
||||
return cf.make_reader(s, range, slice, pc, std::move(trace_state), fwd, fwd_mr);
|
||||
});
|
||||
});
|
||||
return make_ready_future<>();
|
||||
}).get();
|
||||
}
|
||||
|
||||
@@ -26,11 +26,13 @@
|
||||
|
||||
#include <boost/test/unit_test.hpp>
|
||||
#include <query-result-set.hh>
|
||||
#include <query-result-writer.hh>
|
||||
|
||||
#include "tests/test_services.hh"
|
||||
#include "tests/test-utils.hh"
|
||||
#include "tests/mutation_assertions.hh"
|
||||
#include "tests/result_set_assertions.hh"
|
||||
#include "tests/mutation_source_test.hh"
|
||||
|
||||
#include "mutation_query.hh"
|
||||
#include "core/do_with.hh"
|
||||
@@ -525,3 +527,22 @@ SEASTAR_TEST_CASE(test_partition_limit) {
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_result_size_calculation) {
|
||||
random_mutation_generator gen(random_mutation_generator::generate_counters::no);
|
||||
std::vector<mutation> mutations = gen(1);
|
||||
schema_ptr s = gen.schema();
|
||||
mutation_source source = make_source(std::move(mutations));
|
||||
query::result_memory_limiter l;
|
||||
query::partition_slice slice = make_full_slice(*s);
|
||||
slice.options.set<query::partition_slice::option::allow_short_read>();
|
||||
|
||||
query::result::builder digest_only_builder(slice, query::result_options{query::result_request::only_digest, query::digest_algorithm::xxHash}, l.new_digest_read(query::result_memory_limiter::maximum_result_size).get0());
|
||||
data_query(s, source, query::full_partition_range, slice, std::numeric_limits<uint32_t>::max(), std::numeric_limits<uint32_t>::max(), gc_clock::now(), digest_only_builder).get0();
|
||||
|
||||
query::result::builder result_and_digest_builder(slice, query::result_options{query::result_request::result_and_digest, query::digest_algorithm::xxHash}, l.new_data_read(query::result_memory_limiter::maximum_result_size).get0());
|
||||
data_query(s, source, query::full_partition_range, slice, std::numeric_limits<uint32_t>::max(), std::numeric_limits<uint32_t>::max(), gc_clock::now(), result_and_digest_builder).get0();
|
||||
|
||||
BOOST_REQUIRE_EQUAL(digest_only_builder.memory_accounter().used_memory(), result_and_digest_builder.memory_accounter().used_memory());
|
||||
}
|
||||
|
||||
|
||||
@@ -659,6 +659,46 @@ void test_mutation_reader_fragments_have_monotonic_positions(populate_fn populat
|
||||
});
|
||||
}
|
||||
|
||||
static void test_date_tiered_clustering_slicing(populate_fn populate) {
|
||||
BOOST_TEST_MESSAGE(__PRETTY_FUNCTION__);
|
||||
|
||||
simple_schema ss;
|
||||
|
||||
auto s = schema_builder(ss.schema())
|
||||
.set_compaction_strategy(sstables::compaction_strategy_type::date_tiered)
|
||||
.build();
|
||||
|
||||
auto pkey = ss.make_pkey();
|
||||
|
||||
mutation m1(s, pkey);
|
||||
ss.add_static_row(m1, "s");
|
||||
m1.partition().apply(ss.new_tombstone());
|
||||
ss.add_row(m1, ss.make_ckey(0), "v1");
|
||||
|
||||
mutation_source ms = populate(s, {m1});
|
||||
|
||||
// query row outside the range of existing rows to exercise sstable clustering key filter
|
||||
{
|
||||
auto slice = partition_slice_builder(*s)
|
||||
.with_range(ss.make_ckey_range(1, 2))
|
||||
.build();
|
||||
auto prange = dht::partition_range::make_singular(pkey);
|
||||
assert_that(ms.make_reader(s, prange, slice))
|
||||
.produces(m1, slice.row_ranges(*s, pkey.key()))
|
||||
.produces_end_of_stream();
|
||||
}
|
||||
|
||||
{
|
||||
auto slice = partition_slice_builder(*s)
|
||||
.with_range(query::clustering_range::make_singular(ss.make_ckey(0)))
|
||||
.build();
|
||||
auto prange = dht::partition_range::make_singular(pkey);
|
||||
assert_that(ms.make_reader(s, prange, slice))
|
||||
.produces(m1)
|
||||
.produces_end_of_stream();
|
||||
}
|
||||
}
|
||||
|
||||
static void test_clustering_slices(populate_fn populate) {
|
||||
BOOST_TEST_MESSAGE(__PRETTY_FUNCTION__);
|
||||
auto s = schema_builder("ks", "cf")
|
||||
@@ -1012,6 +1052,7 @@ void test_slicing_with_overlapping_range_tombstones(populate_fn populate) {
|
||||
}
|
||||
|
||||
void run_mutation_reader_tests(populate_fn populate) {
|
||||
test_date_tiered_clustering_slicing(populate);
|
||||
test_fast_forwarding_across_partitions_to_empty_range(populate);
|
||||
test_clustering_slices(populate);
|
||||
test_mutation_reader_fragments_have_monotonic_positions(populate);
|
||||
|
||||
@@ -30,6 +30,7 @@
|
||||
#include <map>
|
||||
#include <iostream>
|
||||
#include <sstream>
|
||||
#include <boost/range/algorithm/adjacent_find.hpp>
|
||||
|
||||
static logging::logger nlogger("NetworkTopologyStrategyLogger");
|
||||
|
||||
@@ -52,6 +53,26 @@ void print_natural_endpoints(double point, const std::vector<inet_address> v) {
|
||||
nlogger.debug("{}", strm.str());
|
||||
}
|
||||
|
||||
#ifndef DEBUG
|
||||
static void verify_sorted(const dht::token_range_vector& trv) {
|
||||
auto not_strictly_before = [] (const dht::token_range a, const dht::token_range b) {
|
||||
return !b.start()
|
||||
|| !a.end()
|
||||
|| a.end()->value() > b.start()->value()
|
||||
|| (a.end()->value() == b.start()->value() && a.end()->is_inclusive() && b.start()->is_inclusive());
|
||||
};
|
||||
BOOST_CHECK(boost::adjacent_find(trv, not_strictly_before) == trv.end());
|
||||
}
|
||||
#endif
|
||||
|
||||
static void check_ranges_are_sorted(abstract_replication_strategy* ars, gms::inet_address ep) {
|
||||
// Too slow in debug mode
|
||||
#ifndef DEBUG
|
||||
verify_sorted(ars->get_ranges(ep));
|
||||
verify_sorted(ars->get_primary_ranges(ep));
|
||||
#endif
|
||||
}
|
||||
|
||||
void strategy_sanity_check(
|
||||
abstract_replication_strategy* ars_ptr,
|
||||
const std::map<sstring, sstring>& options) {
|
||||
@@ -150,6 +171,7 @@ void full_ring_check(const std::vector<ring_point>& ring_points,
|
||||
auto endpoints2 = ars_ptr->get_natural_endpoints(t2);
|
||||
|
||||
endpoints_check(ars_ptr, endpoints2);
|
||||
check_ranges_are_sorted(ars_ptr, rp.host);
|
||||
BOOST_CHECK(cache_hit_count + 1 == ars_ptr->get_cache_hits_count());
|
||||
BOOST_CHECK(endpoints1 == endpoints2);
|
||||
}
|
||||
|
||||
@@ -518,7 +518,7 @@ SEASTAR_THREAD_TEST_CASE(test_memory_based_cache_eviction) {
|
||||
}, 24h);
|
||||
|
||||
size_t i = 0;
|
||||
const auto entry = t.produce_first_page_and_save_querier(i);
|
||||
const auto entry = t.produce_first_page_and_save_querier(i++);
|
||||
|
||||
const size_t queriers_needed_to_fill_cache = floor(querier_cache::max_queriers_memory_usage / entry.memory_usage);
|
||||
|
||||
|
||||
@@ -3012,11 +3012,13 @@ SEASTAR_TEST_CASE(test_concurrent_reads_and_eviction) {
|
||||
slice, actual, ::join(",\n", possible_versions)));
|
||||
}
|
||||
}
|
||||
}).finally([&, id] {
|
||||
done = true;
|
||||
});
|
||||
});
|
||||
|
||||
int n_updates = 100;
|
||||
while (!readers.available() && n_updates--) {
|
||||
while (!done && n_updates--) {
|
||||
auto m2 = gen();
|
||||
m2.partition().make_fully_continuous();
|
||||
|
||||
@@ -3034,8 +3036,8 @@ SEASTAR_TEST_CASE(test_concurrent_reads_and_eviction) {
|
||||
tracker.region().evict_some();
|
||||
|
||||
// Don't allow backlog to grow too much to avoid bad_alloc
|
||||
const auto max_active_versions = 10;
|
||||
while (versions.size() > max_active_versions) {
|
||||
const auto max_active_versions = 7;
|
||||
while (!done && versions.size() > max_active_versions) {
|
||||
later().get();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -775,3 +775,7 @@ FUNC_START(__crc32_vpmsum)
|
||||
|
||||
FUNC_END(__crc32_vpmsum)
|
||||
#endif
|
||||
|
||||
// Mark the stack as non-executable so the final executable won't
|
||||
// have an executable stack
|
||||
.section .note.GNU-stack,"",@progbits
|
||||
|
||||
@@ -164,7 +164,7 @@ class unqualified_name {
|
||||
public:
|
||||
// can be optimized with string_views etc.
|
||||
unqualified_name(const sstring& pkg_pfx, const sstring& name)
|
||||
: _qname(name.compare(0, pkg_pfx.size(), pkg_pfx) == 0 ? name.substr(pkg_pfx.size() + 1) : name)
|
||||
: _qname(name.compare(0, pkg_pfx.size(), pkg_pfx) == 0 ? name.substr(pkg_pfx.size()) : name)
|
||||
{}
|
||||
operator const sstring&() const {
|
||||
return _qname;
|
||||
|
||||
@@ -68,10 +68,11 @@ public:
|
||||
|
||||
// Starts a new phase and waits for all operations started in any of the earlier phases.
|
||||
// It is fine to start multiple awaits in parallel.
|
||||
// Strong exception guarantees.
|
||||
future<> advance_and_await() {
|
||||
auto new_gate = make_lw_shared<gate>();
|
||||
++_phase;
|
||||
auto old_gate = std::move(_gate);
|
||||
_gate = make_lw_shared<gate>();
|
||||
auto old_gate = std::exchange(_gate, std::move(new_gate));
|
||||
return old_gate->close().then([old_gate, op = start()] {});
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user