Compare commits
84 Commits
copilot/fi
...
branch-1.4
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
9d9d0258a2 | ||
|
|
05b6b459a0 | ||
|
|
49b43131f4 | ||
|
|
faba7b2ad4 | ||
|
|
c5d8c61652 | ||
|
|
6b58100900 | ||
|
|
48b7e7a368 | ||
|
|
81f8b8d47b | ||
|
|
b10dcfc5f6 | ||
|
|
fbf029acb3 | ||
|
|
a68721f67b | ||
|
|
55dc520405 | ||
|
|
53c3219e56 | ||
|
|
13eb7a6213 | ||
|
|
3f94dead9e | ||
|
|
4513d12311 | ||
|
|
c427193428 | ||
|
|
55b5149c17 | ||
|
|
4f6beb96f3 | ||
|
|
102792ee4b | ||
|
|
125c39d8d1 | ||
|
|
38e78bb8a2 | ||
|
|
58504bda5b | ||
|
|
be9e419a32 | ||
|
|
7647acd201 | ||
|
|
f35c088363 | ||
|
|
f19fca6b8b | ||
|
|
0d6223580c | ||
|
|
6152f091d4 | ||
|
|
9bc54bcf5e | ||
|
|
d6c9abd9ae | ||
|
|
b862c30bc0 | ||
|
|
87fedbbb9d | ||
|
|
8bc1c87cfd | ||
|
|
44448b3ab9 | ||
|
|
419045548d | ||
|
|
25830dd6ac | ||
|
|
7a6dd3c56d | ||
|
|
739bc54246 | ||
|
|
092b214a2e | ||
|
|
5cca752ebb | ||
|
|
5990044158 | ||
|
|
e8c804f4f9 | ||
|
|
ce9468a95d | ||
|
|
a7616b9116 | ||
|
|
2f8a846b46 | ||
|
|
5ef32112ae | ||
|
|
36ad8a8fd8 | ||
|
|
ef012856a5 | ||
|
|
e87bed5816 | ||
|
|
577ffc5851 | ||
|
|
a4fffc9c5d | ||
|
|
10ba47674a | ||
|
|
1c278d9abf | ||
|
|
be0b5ad962 | ||
|
|
707b59100c | ||
|
|
dc8fa5090d | ||
|
|
ea9a8e7f65 | ||
|
|
8d91b8652f | ||
|
|
830df18df5 | ||
|
|
ced171c28b | ||
|
|
34bf40b552 | ||
|
|
a68d829644 | ||
|
|
551c4ff965 | ||
|
|
19b35e812b | ||
|
|
d9ac058bff | ||
|
|
766367a6c5 | ||
|
|
7ac9b6e9ca | ||
|
|
b5bab524e1 | ||
|
|
f4bb52096b | ||
|
|
af26e7a691 | ||
|
|
770c982541 | ||
|
|
2e22c027b2 | ||
|
|
d2c0a5c318 | ||
|
|
52e2688c7b | ||
|
|
08b71b25c1 | ||
|
|
9682b36cdc | ||
|
|
e953955515 | ||
|
|
c1f93c461c | ||
|
|
c0d32a8297 | ||
|
|
a9bd9289a4 | ||
|
|
a2feaa998c | ||
|
|
11950dcba3 | ||
|
|
08ce047792 |
2
.gitmodules
vendored
2
.gitmodules
vendored
@@ -1,6 +1,6 @@
|
||||
[submodule "seastar"]
|
||||
path = seastar
|
||||
url = ../seastar
|
||||
url = ../scylla-seastar
|
||||
ignore = dirty
|
||||
[submodule "swagger-ui"]
|
||||
path = swagger-ui
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
#!/bin/sh
|
||||
|
||||
VERSION=666.development
|
||||
VERSION=1.4.3
|
||||
|
||||
if test -f version
|
||||
then
|
||||
|
||||
@@ -777,7 +777,7 @@
|
||||
]
|
||||
},
|
||||
{
|
||||
"path": "/storage_proxy/metrics/read/moving_avrage_histogram",
|
||||
"path": "/storage_proxy/metrics/read/moving_average_histogram",
|
||||
"operations": [
|
||||
{
|
||||
"method": "GET",
|
||||
@@ -792,7 +792,7 @@
|
||||
]
|
||||
},
|
||||
{
|
||||
"path": "/storage_proxy/metrics/range/moving_avrage_histogram",
|
||||
"path": "/storage_proxy/metrics/range/moving_average_histogram",
|
||||
"operations": [
|
||||
{
|
||||
"method": "GET",
|
||||
@@ -942,7 +942,7 @@
|
||||
]
|
||||
},
|
||||
{
|
||||
"path": "/storage_proxy/metrics/write/moving_avrage_histogram",
|
||||
"path": "/storage_proxy/metrics/write/moving_average_histogram",
|
||||
"operations": [
|
||||
{
|
||||
"method": "GET",
|
||||
|
||||
@@ -194,7 +194,7 @@ void set_cache_service(http_context& ctx, routes& r) {
|
||||
});
|
||||
|
||||
cs::get_row_capacity.set(r, [&ctx] (std::unique_ptr<request> req) {
|
||||
return map_reduce_cf(ctx, 0, [](const column_family& cf) {
|
||||
return map_reduce_cf(ctx, uint64_t(0), [](const column_family& cf) {
|
||||
return cf.get_row_cache().get_cache_tracker().region().occupancy().used_space();
|
||||
}, std::plus<uint64_t>());
|
||||
});
|
||||
|
||||
@@ -684,8 +684,8 @@ void set_storage_service(http_context& ctx, routes& r) {
|
||||
ss::get_slow_query_info.set(r, [](const_req req) {
|
||||
ss::slow_query_info res;
|
||||
res.enable = tracing::tracing::get_local_tracing_instance().slow_query_tracing_enabled();
|
||||
res.ttl = std::chrono::duration_cast<std::chrono::microseconds>(tracing::tracing::get_local_tracing_instance().slow_query_record_ttl()).count() ;
|
||||
res.threshold = std::chrono::duration_cast<std::chrono::microseconds>(tracing::tracing::get_local_tracing_instance().slow_query_threshold()).count();
|
||||
res.ttl = tracing::tracing::get_local_tracing_instance().slow_query_record_ttl().count() ;
|
||||
res.threshold = tracing::tracing::get_local_tracing_instance().slow_query_threshold().count();
|
||||
return res;
|
||||
});
|
||||
|
||||
|
||||
@@ -47,11 +47,8 @@
|
||||
const sstring auth::data_resource::ROOT_NAME("data");
|
||||
|
||||
auth::data_resource::data_resource(level l, const sstring& ks, const sstring& cf)
|
||||
: _ks(ks), _cf(cf)
|
||||
: _level(l), _ks(ks), _cf(cf)
|
||||
{
|
||||
if (l != get_level()) {
|
||||
throw std::invalid_argument("level/keyspace/column mismatch");
|
||||
}
|
||||
}
|
||||
|
||||
auth::data_resource::data_resource()
|
||||
@@ -67,14 +64,7 @@ auth::data_resource::data_resource(const sstring& ks, const sstring& cf)
|
||||
{}
|
||||
|
||||
auth::data_resource::level auth::data_resource::get_level() const {
|
||||
if (!_cf.empty()) {
|
||||
assert(!_ks.empty());
|
||||
return level::COLUMN_FAMILY;
|
||||
}
|
||||
if (!_ks.empty()) {
|
||||
return level::KEYSPACE;
|
||||
}
|
||||
return level::ROOT;
|
||||
return _level;
|
||||
}
|
||||
|
||||
auth::data_resource auth::data_resource::from_name(
|
||||
|
||||
@@ -56,6 +56,7 @@ private:
|
||||
|
||||
static const sstring ROOT_NAME;
|
||||
|
||||
level _level;
|
||||
sstring _ks;
|
||||
sstring _cf;
|
||||
|
||||
|
||||
@@ -218,12 +218,12 @@ future<::shared_ptr<auth::authenticated_user> > auth::password_authenticator::au
|
||||
// obsolete prepared statements pretty quickly.
|
||||
// Rely on query processing caching statements instead, and lets assume
|
||||
// that a map lookup string->statement is not gonna kill us much.
|
||||
auto& qp = cql3::get_local_query_processor();
|
||||
return qp.process(
|
||||
sprint("SELECT %s FROM %s.%s WHERE %s = ?", SALTED_HASH,
|
||||
auth::AUTH_KS, CREDENTIALS_CF, USER_NAME),
|
||||
consistency_for_user(username), { username }, true).then_wrapped(
|
||||
[=](future<::shared_ptr<cql3::untyped_result_set>> f) {
|
||||
return futurize_apply([this, username, password] {
|
||||
auto& qp = cql3::get_local_query_processor();
|
||||
return qp.process(sprint("SELECT %s FROM %s.%s WHERE %s = ?", SALTED_HASH,
|
||||
auth::AUTH_KS, CREDENTIALS_CF, USER_NAME),
|
||||
consistency_for_user(username), {username}, true);
|
||||
}).then_wrapped([=](future<::shared_ptr<cql3::untyped_result_set>> f) {
|
||||
try {
|
||||
auto res = f.get0();
|
||||
if (res->empty() || !checkpw(password, res->one().get_as<sstring>(SALTED_HASH))) {
|
||||
@@ -234,6 +234,8 @@ future<::shared_ptr<auth::authenticated_user> > auth::password_authenticator::au
|
||||
std::throw_with_nested(exceptions::authentication_exception("Could not verify password"));
|
||||
} catch (exceptions::request_execution_exception& e) {
|
||||
std::throw_with_nested(exceptions::authentication_exception(e.what()));
|
||||
} catch (...) {
|
||||
std::throw_with_nested(exceptions::authentication_exception("authentication failed"));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@@ -40,6 +40,7 @@
|
||||
*/
|
||||
|
||||
#include <unordered_map>
|
||||
#include <boost/algorithm/string.hpp>
|
||||
#include "permission.hh"
|
||||
|
||||
const auth::permission_set auth::permissions::ALL_DATA =
|
||||
@@ -75,7 +76,9 @@ const sstring& auth::permissions::to_string(permission p) {
|
||||
}
|
||||
|
||||
auth::permission auth::permissions::from_string(const sstring& s) {
|
||||
return permission_names.at(s);
|
||||
sstring upper(s);
|
||||
boost::to_upper(upper);
|
||||
return permission_names.at(upper);
|
||||
}
|
||||
|
||||
std::unordered_set<sstring> auth::permissions::to_strings(const permission_set& set) {
|
||||
|
||||
@@ -54,6 +54,10 @@ public:
|
||||
// Return a list of sstables to be compacted after applying the strategy.
|
||||
compaction_descriptor get_sstables_for_compaction(column_family& cfs, std::vector<lw_shared_ptr<sstable>> candidates);
|
||||
|
||||
// Some strategies may look at the compacted and resulting sstables to
|
||||
// get some useful information for subsequent compactions.
|
||||
void notify_completion(const std::vector<lw_shared_ptr<sstable>>& removed, const std::vector<lw_shared_ptr<sstable>>& added);
|
||||
|
||||
// Return if parallel compaction is allowed by strategy.
|
||||
bool parallel_compaction() const;
|
||||
|
||||
|
||||
@@ -222,6 +222,7 @@ scylla_tests = [
|
||||
'tests/database_test',
|
||||
'tests/nonwrapping_range_test',
|
||||
'tests/input_stream_test',
|
||||
'tests/sstable_atomic_deletion_test',
|
||||
]
|
||||
|
||||
apps = [
|
||||
@@ -309,6 +310,7 @@ scylla_core = (['database.cc',
|
||||
'sstables/compaction.cc',
|
||||
'sstables/compaction_strategy.cc',
|
||||
'sstables/compaction_manager.cc',
|
||||
'sstables/atomic_deletion.cc',
|
||||
'transport/event.cc',
|
||||
'transport/event_notifier.cc',
|
||||
'transport/server.cc',
|
||||
|
||||
@@ -232,7 +232,7 @@ uint32_t selection::add_column_for_ordering(const column_definition& c) {
|
||||
raw_selector::to_selectables(raw_selectors, schema), db, schema, defs);
|
||||
|
||||
auto metadata = collect_metadata(schema, raw_selectors, *factories);
|
||||
if (processes_selection(raw_selectors)) {
|
||||
if (processes_selection(raw_selectors) || raw_selectors.size() != defs.size()) {
|
||||
return ::make_shared<selection_with_processing>(schema, std::move(defs), std::move(metadata), std::move(factories));
|
||||
} else {
|
||||
return ::make_shared<simple_selection>(schema, std::move(defs), std::move(metadata), false);
|
||||
|
||||
32
database.cc
32
database.cc
@@ -427,8 +427,14 @@ column_family::make_sstable_reader(schema_ptr s,
|
||||
tracing::trace_state_ptr trace_state) const {
|
||||
// restricts a reader's concurrency if the configuration specifies it
|
||||
auto restrict_reader = [&] (mutation_reader&& in) {
|
||||
if (_config.read_concurrency_config.sem) {
|
||||
return make_restricted_reader(_config.read_concurrency_config, 1, std::move(in));
|
||||
auto&& config = [this, &pc] () -> const restricted_mutation_reader_config& {
|
||||
if (service::get_local_streaming_read_priority().id() == pc.id()) {
|
||||
return _config.streaming_read_concurrency_config;
|
||||
}
|
||||
return _config.read_concurrency_config;
|
||||
}();
|
||||
if (config.sem) {
|
||||
return make_restricted_reader(config, 1, std::move(in));
|
||||
} else {
|
||||
return std::move(in);
|
||||
}
|
||||
@@ -1241,10 +1247,17 @@ column_family::rebuild_sstable_list(const std::vector<sstables::shared_sstable>&
|
||||
// Second, delete the old sstables. This is done in the background, so we can
|
||||
// consider this compaction completed.
|
||||
seastar::with_gate(_sstable_deletion_gate, [this, sstables_to_remove] {
|
||||
return sstables::delete_atomically(sstables_to_remove).then([this, sstables_to_remove] {
|
||||
auto current_sstables = _sstables;
|
||||
auto new_sstable_list = make_lw_shared<sstable_list>();
|
||||
return sstables::delete_atomically(sstables_to_remove).then_wrapped([this, sstables_to_remove] (future<> f) {
|
||||
std::exception_ptr eptr;
|
||||
try {
|
||||
f.get();
|
||||
} catch(...) {
|
||||
eptr = std::current_exception();
|
||||
}
|
||||
|
||||
// unconditionally remove compacted sstables from _sstables_compacted_but_not_deleted,
|
||||
// or they could stay forever in the set, resulting in deleted files remaining
|
||||
// opened and disk space not being released until shutdown.
|
||||
std::unordered_set<sstables::shared_sstable> s(
|
||||
sstables_to_remove.begin(), sstables_to_remove.end());
|
||||
auto e = boost::range::remove_if(_sstables_compacted_but_not_deleted, [&] (sstables::shared_sstable sst) -> bool {
|
||||
@@ -1252,6 +1265,11 @@ column_family::rebuild_sstable_list(const std::vector<sstables::shared_sstable>&
|
||||
});
|
||||
_sstables_compacted_but_not_deleted.erase(e, _sstables_compacted_but_not_deleted.end());
|
||||
rebuild_statistics();
|
||||
|
||||
if (eptr) {
|
||||
return make_exception_future<>(eptr);
|
||||
}
|
||||
return make_ready_future<>();
|
||||
}).handle_exception([] (std::exception_ptr e) {
|
||||
try {
|
||||
std::rethrow_exception(e);
|
||||
@@ -1283,6 +1301,7 @@ column_family::compact_sstables(sstables::compaction_descriptor descriptor, bool
|
||||
};
|
||||
return sstables::compact_sstables(*sstables_to_compact, *this, create_sstable, descriptor.max_sstable_bytes, descriptor.level,
|
||||
cleanup).then([this, sstables_to_compact] (auto new_sstables) {
|
||||
_compaction_strategy.notify_completion(*sstables_to_compact, new_sstables);
|
||||
return this->rebuild_sstable_list(new_sstables, *sstables_to_compact);
|
||||
});
|
||||
});
|
||||
@@ -2070,6 +2089,7 @@ keyspace::make_column_family_config(const schema& s, const db::config& db_config
|
||||
cfg.dirty_memory_manager = _config.dirty_memory_manager;
|
||||
cfg.streaming_dirty_memory_manager = _config.streaming_dirty_memory_manager;
|
||||
cfg.read_concurrency_config = _config.read_concurrency_config;
|
||||
cfg.streaming_read_concurrency_config = _config.streaming_read_concurrency_config;
|
||||
cfg.cf_stats = _config.cf_stats;
|
||||
cfg.enable_incremental_backups = _config.enable_incremental_backups;
|
||||
cfg.max_cached_partition_size_in_bytes = db_config.max_cached_partition_size_in_kb() * 1024;
|
||||
@@ -2559,6 +2579,8 @@ database::make_keyspace_config(const keyspace_metadata& ksm) {
|
||||
++_stats->sstable_read_queue_overloaded;
|
||||
throw std::runtime_error("sstable inactive read queue overloaded");
|
||||
};
|
||||
cfg.streaming_read_concurrency_config = cfg.read_concurrency_config;
|
||||
cfg.streaming_read_concurrency_config.timeout = {};
|
||||
cfg.cf_stats = &_cf_stats;
|
||||
cfg.enable_incremental_backups = _enable_incremental_backups;
|
||||
return cfg;
|
||||
|
||||
@@ -325,6 +325,7 @@ public:
|
||||
::dirty_memory_manager* dirty_memory_manager = &default_dirty_memory_manager;
|
||||
::dirty_memory_manager* streaming_dirty_memory_manager = &default_dirty_memory_manager;
|
||||
restricted_mutation_reader_config read_concurrency_config;
|
||||
restricted_mutation_reader_config streaming_read_concurrency_config;
|
||||
::cf_stats* cf_stats = nullptr;
|
||||
uint64_t max_cached_partition_size_in_bytes;
|
||||
};
|
||||
@@ -879,6 +880,7 @@ public:
|
||||
::dirty_memory_manager* dirty_memory_manager = &default_dirty_memory_manager;
|
||||
::dirty_memory_manager* streaming_dirty_memory_manager = &default_dirty_memory_manager;
|
||||
restricted_mutation_reader_config read_concurrency_config;
|
||||
restricted_mutation_reader_config streaming_read_concurrency_config;
|
||||
::cf_stats* cf_stats = nullptr;
|
||||
};
|
||||
private:
|
||||
|
||||
@@ -1557,6 +1557,15 @@ db::commitlog::read_log_file(const sstring& filename, commit_load_reader_func ne
|
||||
subscription<temporary_buffer<char>, db::replay_position>
|
||||
db::commitlog::read_log_file(file f, commit_load_reader_func next, position_type off) {
|
||||
struct work {
|
||||
private:
|
||||
file_input_stream_options make_file_input_stream_options() {
|
||||
file_input_stream_options fo;
|
||||
fo.buffer_size = db::commitlog::segment::default_size;
|
||||
fo.read_ahead = 10;
|
||||
fo.io_priority_class = service::get_local_commitlog_priority();
|
||||
return fo;
|
||||
}
|
||||
public:
|
||||
file f;
|
||||
stream<temporary_buffer<char>, replay_position> s;
|
||||
input_stream<char> fin;
|
||||
@@ -1572,7 +1581,7 @@ db::commitlog::read_log_file(file f, commit_load_reader_func next, position_type
|
||||
bool header = true;
|
||||
|
||||
work(file f, position_type o = 0)
|
||||
: f(f), fin(make_file_input_stream(f)), start_off(o) {
|
||||
: f(f), fin(make_file_input_stream(f, o, make_file_input_stream_options())), start_off(o) {
|
||||
}
|
||||
work(work&&) = default;
|
||||
|
||||
@@ -1755,6 +1764,8 @@ db::commitlog::read_log_file(file f, commit_load_reader_func next, position_type
|
||||
throw segment_data_corruption_error("Data corruption", corrupt_size);
|
||||
}
|
||||
});
|
||||
}).finally([this] {
|
||||
return fin.close();
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
2
dist/ami/build_ami.sh
vendored
2
dist/ami/build_ami.sh
vendored
@@ -33,7 +33,7 @@ done
|
||||
. /etc/os-release
|
||||
case "$ID" in
|
||||
"centos")
|
||||
AMI=ami-f3102499
|
||||
AMI=ami-4e1d5b59
|
||||
REGION=us-east-1
|
||||
SSH_USERNAME=centos
|
||||
;;
|
||||
|
||||
4
dist/common/scripts/scylla_io_setup
vendored
4
dist/common/scripts/scylla_io_setup
vendored
@@ -44,8 +44,8 @@ output_to_user()
|
||||
}
|
||||
|
||||
if [ `is_developer_mode` -eq 0 ]; then
|
||||
SMP=`echo $CPUSET|grep smp|sed -e "s/^.*smp\(\s\+\|=\)\([0-9]*\).*$/\2/"`
|
||||
CPUSET=`echo $CPUSET|grep cpuset|sed -e "s/^.*\(--cpuset\(\s\+\|=\)[0-9\-]*\).*$/\1/"`
|
||||
SMP=`echo $CPUSET|grep smp|sed -e "s/^.*smp\(\s\+\|=\)\([^ ]*\).*$/\2/"`
|
||||
CPUSET=`echo $CPUSET|grep cpuset|sed -e "s/^.*\(--cpuset\(\s\+\|=\)[^ ]*\).*$/\1/"`
|
||||
if [ $AMI_OPT -eq 1 ]; then
|
||||
NR_CPU=`cat /proc/cpuinfo |grep processor|wc -l`
|
||||
NR_DISKS=`lsblk --list --nodeps --noheadings | grep -v xvda | grep xvd | wc -l`
|
||||
|
||||
2
dist/common/scripts/scylla_kernel_check
vendored
2
dist/common/scripts/scylla_kernel_check
vendored
@@ -30,6 +30,6 @@ else
|
||||
else
|
||||
echo "Please upgrade to a newer kernel version."
|
||||
fi
|
||||
echo " see http://docs.scylladb.com/kb/kb-fs-not-qualified-aio/ for details"
|
||||
echo " see http://www.scylladb.com/kb/kb-fs-not-qualified-aio/ for details"
|
||||
fi
|
||||
exit $RET
|
||||
|
||||
31
dist/common/scripts/scylla_setup
vendored
31
dist/common/scripts/scylla_setup
vendored
@@ -84,7 +84,7 @@ get_unused_disks() {
|
||||
if [ -f /usr/sbin/pvs ]; then
|
||||
count_pvs=$(pvs|grep $dev|wc -l)
|
||||
fi
|
||||
count_swap=$(swapon --show |grep `realpath $dev`|wc -l)
|
||||
count_swap=$(swapon -s |grep `realpath $dev`|wc -l)
|
||||
if [ $count_raw -eq 0 -a $count_pvs -eq 0 -a $count_swap -eq 0 ]; then
|
||||
echo -n "$dev "
|
||||
fi
|
||||
@@ -226,31 +226,32 @@ fi
|
||||
if [ $INTERACTIVE -eq 1 ]; then
|
||||
interactive_ask_service "Do you want to enable ScyllaDB services?" "Answer yes to automatically start Scylla when the node boots; answer no to skip this step." "yes" &&:
|
||||
ENABLE_SERVICE=$?
|
||||
if [ $ENABLE_SERVICE -eq 1 ] && [ ! -f /etc/scylla.d/housekeeping.cfg ]; then
|
||||
interactive_ask_service "Do you want to enable ScyllaDB version check?" "Answer yes to automatically start Scylla-housekeeping that check for newer version, when the node boots; answer no to skip this step." "yes" &&:
|
||||
ENABLE_CHECK_VERSION=$?
|
||||
fi
|
||||
fi
|
||||
|
||||
if [ $ENABLE_SERVICE -eq 1 ]; then
|
||||
if [ "$ID" = "fedora" ] || [ "$ID" = "centos" ] || [ "$ID" = "ubuntu" -a "$VERSION_ID" != "14.04" ]; then
|
||||
systemctl enable scylla-server.service
|
||||
systemctl enable collectd.service
|
||||
if [ $ENABLE_CHECK_VERSION -eq 1 ]; then
|
||||
systemctl unmask scylla-housekeeping.timer
|
||||
else
|
||||
systemctl mask scylla-housekeeping.timer
|
||||
systemctl stop scylla-housekeeping.timer || true
|
||||
fi
|
||||
fi
|
||||
if [ $INTERACTIVE -eq 1 ] && [ ! -f /etc/scylla.d/housekeeping.cfg ]; then
|
||||
interactive_ask_service "Do you want to enable ScyllaDB version check?" "Answer yes to automatically start Scylla-housekeeping that check for newer version, when the node boots; answer no to skip this step." "yes" &&:
|
||||
ENABLE_CHECK_VERSION=$?
|
||||
fi
|
||||
if [ $ENABLE_CHECK_VERSION -eq 1 ]; then
|
||||
if [ ! -f /etc/scylla.d/housekeeping.cfg ]; then
|
||||
printf "[housekeeping]\ncheck-version: True\n" > /etc/scylla.d/housekeeping.cfg
|
||||
fi
|
||||
if [ "$ID" = "fedora" ] || [ "$ID" = "centos" ] || [ "$ID" = "ubuntu" -a "$VERSION_ID" != "14.04" ]; then
|
||||
systemctl unmask scylla-housekeeping.timer
|
||||
fi
|
||||
else
|
||||
if [ ! -f /etc/scylla.d/housekeeping.cfg ]; then
|
||||
printf "[housekeeping]\ncheck-version: False\n" > /etc/scylla.d/housekeeping.cfg
|
||||
fi
|
||||
if [ "$ID" = "fedora" ] || [ "$ID" = "centos" ] || [ "$ID" = "ubuntu" -a "$VERSION_ID" != "14.04" ]; then
|
||||
systemctl mask scylla-housekeeping.timer
|
||||
systemctl stop scylla-housekeeping.timer || true
|
||||
fi
|
||||
fi
|
||||
fi
|
||||
|
||||
@@ -374,6 +375,10 @@ if [ $INTERACTIVE -eq 1 ]; then
|
||||
IO_SETUP=$?
|
||||
fi
|
||||
|
||||
if [ $IO_SETUP -eq 1 ]; then
|
||||
/usr/lib/scylla/scylla_io_setup
|
||||
fi
|
||||
|
||||
if [ $INTERACTIVE -eq 1 ]; then
|
||||
interactive_ask_service "Do you want to install node exporter, that exports prometheus data from the node?" "Answer yes to install it; answer no to skip this installation." "yes" &&:
|
||||
NODE_EXPORTER=$?
|
||||
@@ -383,10 +388,6 @@ if [ $NODE_EXPORTER -eq 1 ]; then
|
||||
/usr/lib/scylla/node_exporter_install
|
||||
fi
|
||||
|
||||
if [ $IO_SETUP -eq 1 ]; then
|
||||
/usr/lib/scylla/scylla_io_setup
|
||||
fi
|
||||
|
||||
if [ $DEV_MODE -eq 1 ]; then
|
||||
/usr/lib/scylla/scylla_dev_mode_setup --developer-mode 1
|
||||
fi
|
||||
|
||||
2
dist/common/systemd/node-exporter.service
vendored
2
dist/common/systemd/node-exporter.service
vendored
@@ -3,8 +3,6 @@ Description=Node Exporter
|
||||
|
||||
[Service]
|
||||
Type=simple
|
||||
User=scylla
|
||||
Group=scylla
|
||||
ExecStart=/usr/bin/node_exporter
|
||||
|
||||
[Install]
|
||||
|
||||
@@ -4,7 +4,8 @@ After=scylla-server.service
|
||||
BindsTo=scylla-server.service
|
||||
|
||||
[Timer]
|
||||
OnBootSec=0
|
||||
# set OnActiveSec to 3 to safely avoid issues/1846
|
||||
OnActiveSec=3
|
||||
OnUnitActiveSec=1d
|
||||
|
||||
[Install]
|
||||
|
||||
2
dist/docker/redhat/Dockerfile
vendored
2
dist/docker/redhat/Dockerfile
vendored
@@ -7,7 +7,7 @@ ENV container docker
|
||||
VOLUME [ "/sys/fs/cgroup" ]
|
||||
|
||||
#install scylla
|
||||
RUN curl http://downloads.scylladb.com/rpm/unstable/centos/master/latest/scylla.repo -o /etc/yum.repos.d/scylla.repo
|
||||
RUN curl http://downloads.scylladb.com/rpm/centos/scylla-1.4.repo -o /etc/yum.repos.d/scylla.repo
|
||||
RUN yum -y install epel-release
|
||||
RUN yum -y clean expire-cache
|
||||
RUN yum -y update
|
||||
|
||||
1
dist/docker/redhat/commandlineparser.py
vendored
1
dist/docker/redhat/commandlineparser.py
vendored
@@ -9,6 +9,7 @@ def parse():
|
||||
parser.add_argument('--smp', default=None, help="e.g --smp 2 to use two CPUs")
|
||||
parser.add_argument('--memory', default=None, help="e.g. --memory 1G to use 1 GB of RAM")
|
||||
parser.add_argument('--overprovisioned', default='0', choices=['0', '1'], help="run in overprovisioned environment")
|
||||
parser.add_argument('--listen-address', default=None, dest='listenAddress')
|
||||
parser.add_argument('--broadcast-address', default=None, dest='broadcastAddress')
|
||||
parser.add_argument('--broadcast-rpc-address', default=None, dest='broadcastRpcAddress')
|
||||
return parser.parse_args()
|
||||
|
||||
10
dist/docker/redhat/scyllasetup.py
vendored
10
dist/docker/redhat/scyllasetup.py
vendored
@@ -8,6 +8,7 @@ class ScyllaSetup:
|
||||
self._developerMode = arguments.developerMode
|
||||
self._seeds = arguments.seeds
|
||||
self._cpuset = arguments.cpuset
|
||||
self._listenAddress = arguments.listenAddress
|
||||
self._broadcastAddress = arguments.broadcastAddress
|
||||
self._broadcastRpcAddress = arguments.broadcastRpcAddress
|
||||
self._smp = arguments.smp
|
||||
@@ -31,14 +32,15 @@ class ScyllaSetup:
|
||||
|
||||
def scyllaYAML(self):
|
||||
configuration = yaml.load(open('/etc/scylla/scylla.yaml'))
|
||||
IP = subprocess.check_output(['hostname', '-i']).decode('ascii').strip()
|
||||
configuration['listen_address'] = IP
|
||||
configuration['rpc_address'] = IP
|
||||
if self._listenAddress is None:
|
||||
self._listenAddress = subprocess.check_output(['hostname', '-i']).decode('ascii').strip()
|
||||
configuration['listen_address'] = self._listenAddress
|
||||
configuration['rpc_address'] = self._listenAddress
|
||||
if self._seeds is None:
|
||||
if self._broadcastAddress is not None:
|
||||
self._seeds = self._broadcastAddress
|
||||
else:
|
||||
self._seeds = IP
|
||||
self._seeds = self._listenAddress
|
||||
configuration['seed_provider'] = [
|
||||
{'class_name': 'org.apache.cassandra.locator.SimpleSeedProvider',
|
||||
'parameters': [{'seeds': self._seeds}]}
|
||||
|
||||
6
dist/redhat/scylla.spec.in
vendored
6
dist/redhat/scylla.spec.in
vendored
@@ -27,7 +27,7 @@ Group: Applications/Databases
|
||||
Summary: The Scylla database server
|
||||
License: AGPLv3
|
||||
URL: http://www.scylladb.com/
|
||||
BuildRequires: libaio-devel libstdc++-devel cryptopp-devel hwloc-devel numactl-devel libpciaccess-devel libxml2-devel zlib-devel thrift-devel yaml-cpp-devel lz4-devel snappy-devel jsoncpp-devel systemd-devel xz-devel openssl-devel libcap-devel libselinux-devel libgcrypt-devel libgpg-error-devel elfutils-devel krb5-devel libcom_err-devel libattr-devel pcre-devel elfutils-libelf-devel bzip2-devel keyutils-libs-devel xfsprogs-devel make gnutls-devel systemd-devel lksctp-tools-devel protobuf-devel protobuf-compiler
|
||||
BuildRequires: libaio-devel libstdc++-devel cryptopp-devel hwloc-devel numactl-devel libpciaccess-devel libxml2-devel zlib-devel thrift-devel yaml-cpp-devel lz4-devel snappy-devel jsoncpp-devel systemd-devel xz-devel openssl-devel libcap-devel libselinux-devel libgcrypt-devel libgpg-error-devel elfutils-devel krb5-devel libcom_err-devel libattr-devel pcre-devel elfutils-libelf-devel bzip2-devel keyutils-libs-devel xfsprogs-devel make gnutls-devel systemd-devel lksctp-tools-devel protobuf-devel protobuf-compiler libunwind-devel
|
||||
%{?fedora:BuildRequires: boost-devel ninja-build ragel antlr3-tool antlr3-C++-devel python3 gcc-c++ libasan libubsan python3-pyparsing dnf-yum}
|
||||
%{?rhel:BuildRequires: scylla-libstdc++-static scylla-boost-devel scylla-ninja-build scylla-ragel scylla-antlr3-tool scylla-antlr3-C++-devel python34 scylla-gcc-c++ >= 5.1.1, python34-pyparsing}
|
||||
Requires: scylla-conf systemd-libs hwloc collectd PyYAML python-urwid pciutils pyparsing python-requests curl bc util-linux
|
||||
@@ -108,8 +108,8 @@ cp -r scylla-housekeeping $RPM_BUILD_ROOT%{_prefix}/lib/scylla/scylla-housekeepi
|
||||
cp -P dist/common/sbin/* $RPM_BUILD_ROOT%{_sbindir}/
|
||||
|
||||
%pre server
|
||||
/usr/sbin/groupadd scylla 2> /dev/null || :
|
||||
/usr/sbin/useradd -g scylla -s /sbin/nologin -r -d %{_sharedstatedir}/scylla scylla 2> /dev/null || :
|
||||
getent group scylla || /usr/sbin/groupadd scylla 2> /dev/null || :
|
||||
getent passwd scylla || /usr/sbin/useradd -g scylla -s /sbin/nologin -r -d %{_sharedstatedir}/scylla scylla 2> /dev/null || :
|
||||
|
||||
%post server
|
||||
# Upgrade coredump settings
|
||||
|
||||
5
dist/ubuntu/build_deb.sh
vendored
5
dist/ubuntu/build_deb.sh
vendored
@@ -78,13 +78,13 @@ cp dist/ubuntu/scylla-server.install.in debian/scylla-server.install
|
||||
if [ "$RELEASE" = "14.04" ]; then
|
||||
sed -i -e "s/@@DH_INSTALLINIT@@/--upstart-only/g" debian/rules
|
||||
sed -i -e "s/@@COMPILER@@/g++-5/g" debian/rules
|
||||
sed -i -e "s/@@BUILD_DEPENDS@@/g++-5/g" debian/control
|
||||
sed -i -e "s/@@BUILD_DEPENDS@@/g++-5, libunwind8-dev/g" debian/control
|
||||
sed -i -e "s#@@INSTALL@@#dist/ubuntu/sudoers.d/scylla etc/sudoers.d#g" debian/scylla-server.install
|
||||
sed -i -e "s#@@HKDOTTIMER@@##g" debian/scylla-server.install
|
||||
else
|
||||
sed -i -e "s/@@DH_INSTALLINIT@@//g" debian/rules
|
||||
sed -i -e "s/@@COMPILER@@/g++/g" debian/rules
|
||||
sed -i -e "s/@@BUILD_DEPENDS@@/libsystemd-dev, g++/g" debian/control
|
||||
sed -i -e "s/@@BUILD_DEPENDS@@/libsystemd-dev, g++, libunwind-dev/g" debian/control
|
||||
sed -i -e "s#@@INSTALL@@##g" debian/scylla-server.install
|
||||
sed -i -e "s#@@HKDOTTIMER@@#dist/common/systemd/scylla-housekeeping.timer /lib/systemd/system#g" debian/scylla-server.install
|
||||
fi
|
||||
@@ -102,6 +102,7 @@ fi
|
||||
cp dist/common/systemd/scylla-server.service.in debian/scylla-server.service
|
||||
sed -i -e "s#@@SYSCONFDIR@@#/etc/default#g" debian/scylla-server.service
|
||||
cp dist/common/systemd/scylla-housekeeping.service debian/scylla-server.scylla-housekeeping.service
|
||||
cp dist/common/systemd/node-exporter.service debian/scylla-server.node-exporter.service
|
||||
|
||||
if [ "$RELEASE" = "14.04" ] && [ $REBUILD -eq 0 ]; then
|
||||
if [ ! -f /etc/apt/sources.list.d/scylla-3rdparty-trusty.list ]; then
|
||||
|
||||
2
dist/ubuntu/control.in
vendored
2
dist/ubuntu/control.in
vendored
@@ -16,7 +16,7 @@ Conflicts: scylla-server (<< 1.1)
|
||||
|
||||
Package: scylla-server
|
||||
Architecture: amd64
|
||||
Depends: ${shlibs:Depends}, ${misc:Depends}, adduser, hwloc-nox, collectd, scylla-conf, python-yaml, python-urwid, python-requests, curl, bc, util-linux, @@DEPENDS@@
|
||||
Depends: ${shlibs:Depends}, ${misc:Depends}, adduser, hwloc-nox, collectd, scylla-conf, python-yaml, python-urwid, python-requests, curl, bc, util-linux, realpath, @@DEPENDS@@
|
||||
Description: Scylla database server binaries
|
||||
Scylla is a highly scalable, eventually consistent, distributed,
|
||||
partitioned row DB.
|
||||
|
||||
1
dist/ubuntu/rules.in
vendored
1
dist/ubuntu/rules.in
vendored
@@ -12,6 +12,7 @@ override_dh_auto_clean:
|
||||
override_dh_installinit:
|
||||
dh_installinit --no-start @@DH_INSTALLINIT@@
|
||||
dh_installinit --no-start --name scylla-housekeeping @@DH_INSTALLINIT@@
|
||||
dh_installinit --no-start --name node-exporter @@DH_INSTALLINIT@@
|
||||
|
||||
override_dh_strip:
|
||||
dh_strip --dbg-package=scylla-server-dbg
|
||||
|
||||
@@ -97,6 +97,18 @@ For example, to configure Scylla to run with two seed nodes `192.168.0.100` and
|
||||
$ docker run --name some-scylla -d scylladb/scylla --seeds 192.168.0.100,192.168.0.200
|
||||
```
|
||||
|
||||
### `--listen-address ADDR`
|
||||
|
||||
The `--listen-address` command line option configures the IP address the Scylla instance listens for client connections.
|
||||
|
||||
For example, to configure Scylla to use listen address `10.0.0.5`:
|
||||
|
||||
```console
|
||||
$ docker run --name some-scylla -d scylladb/scylla --listen-address 10.0.0.5
|
||||
```
|
||||
|
||||
**Since: 1.4**
|
||||
|
||||
### `--broadcast-address ADDR`
|
||||
|
||||
The `--broadcast-address` command line option configures the IP address the Scylla instance tells other Scylla nodes in the cluster to connect to.
|
||||
|
||||
@@ -54,14 +54,17 @@ static void unwrap_first_range(std::vector<range<token>>& ret) {
|
||||
|
||||
std::unique_ptr<abstract_replication_strategy> abstract_replication_strategy::create_replication_strategy(const sstring& ks_name, const sstring& strategy_name, token_metadata& tk_metadata, const std::map<sstring, sstring>& config_options) {
|
||||
assert(locator::i_endpoint_snitch::get_local_snitch_ptr());
|
||||
|
||||
return create_object<abstract_replication_strategy,
|
||||
const sstring&,
|
||||
token_metadata&,
|
||||
snitch_ptr&,
|
||||
const std::map<sstring, sstring>&>
|
||||
(strategy_name, ks_name, tk_metadata,
|
||||
locator::i_endpoint_snitch::get_local_snitch_ptr(), config_options);
|
||||
try {
|
||||
return create_object<abstract_replication_strategy,
|
||||
const sstring&,
|
||||
token_metadata&,
|
||||
snitch_ptr&,
|
||||
const std::map<sstring, sstring>&>
|
||||
(strategy_name, ks_name, tk_metadata,
|
||||
locator::i_endpoint_snitch::get_local_snitch_ptr(), config_options);
|
||||
} catch (const no_such_class& e) {
|
||||
throw exceptions::configuration_exception(e.what());
|
||||
}
|
||||
}
|
||||
|
||||
void abstract_replication_strategy::validate_replication_strategy(const sstring& ks_name,
|
||||
|
||||
4
main.cc
4
main.cc
@@ -183,8 +183,8 @@ public:
|
||||
throw;
|
||||
}
|
||||
});
|
||||
} catch (std::system_error& e) {
|
||||
startlog.error("Directory '{}' not found. Tried to created it but failed: {}", path, e.what());
|
||||
} catch (...) {
|
||||
startlog.error("Directory '{}' cannot be initialized. Tried to do it but failed with: {}", path, std::current_exception());
|
||||
throw;
|
||||
}
|
||||
});
|
||||
|
||||
@@ -36,7 +36,8 @@ static void remove_or_mark_as_unique_owner(partition_version* current)
|
||||
}
|
||||
|
||||
partition_version::partition_version(partition_version&& pv) noexcept
|
||||
: _backref(pv._backref)
|
||||
: anchorless_list_base_hook(std::move(pv))
|
||||
, _backref(pv._backref)
|
||||
, _partition(std::move(pv._partition))
|
||||
{
|
||||
if (_backref) {
|
||||
@@ -326,9 +327,9 @@ partition_snapshot_reader::~partition_snapshot_reader()
|
||||
try {
|
||||
_read_section(_lsa_region, [this] {
|
||||
_snapshot->merge_partition_versions();
|
||||
_snapshot = {};
|
||||
});
|
||||
} catch (...) { }
|
||||
_snapshot = {};
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
@@ -375,6 +375,7 @@ public:
|
||||
++_it;
|
||||
_last = ce.key();
|
||||
_cache.upgrade_entry(ce);
|
||||
_cache._tracker.touch(ce);
|
||||
cache_data cd { { }, _cache._tracker.continuity_flags_cleared(), ce.continuous() };
|
||||
if (ce.wide_partition()) {
|
||||
return ce.read_wide(_cache, _schema, _slice, _pc).then([this, cd = std::move(cd)] (auto smopt) mutable {
|
||||
|
||||
2
seastar
2
seastar
Submodule seastar updated: 9e1d5dbc66...7907baee37
@@ -54,7 +54,7 @@ public:
|
||||
const cql3::query_options& options,
|
||||
lw_shared_ptr<query::read_command> cmd,
|
||||
std::vector<query::partition_range> ranges)
|
||||
: _has_clustering_keys(s->clustering_key_size() > 0)
|
||||
: _has_clustering_keys(has_clustering_keys(*s, *cmd))
|
||||
, _max(cmd->row_limit)
|
||||
, _schema(std::move(s))
|
||||
, _selection(selection)
|
||||
@@ -65,6 +65,11 @@ public:
|
||||
{}
|
||||
|
||||
private:
|
||||
static bool has_clustering_keys(const schema& s, const query::read_command& cmd) {
|
||||
return s.clustering_key_size() > 0
|
||||
&& !cmd.slice.options.contains<query::partition_slice::option::distinct>();
|
||||
}
|
||||
|
||||
future<> fetch_page(cql3::selection::result_set_builder& builder, uint32_t page_size, db_clock::time_point now) override {
|
||||
auto state = _options.get_paging_state();
|
||||
|
||||
@@ -124,6 +129,43 @@ private:
|
||||
logger.trace("Result ranges {}", ranges);
|
||||
};
|
||||
|
||||
// Because of #1446 we don't have a comparator to use with
|
||||
// range<clustering_key_prefix> which would produce correct results.
|
||||
// This means we cannot reuse the same logic for dealing with
|
||||
// partition and clustering keys.
|
||||
auto modify_ck_ranges = [reversed] (const schema& s, auto& ranges, auto& lo) {
|
||||
typedef typename std::remove_reference_t<decltype(ranges)>::value_type range_type;
|
||||
typedef typename range_type::bound bound_type;
|
||||
|
||||
auto cmp = [reversed, bv_cmp = bound_view::compare(s)] (const auto& a, const auto& b) {
|
||||
return reversed ? bv_cmp(b, a) : bv_cmp(a, b);
|
||||
};
|
||||
auto start_bound = [reversed] (const auto& range) -> const bound_view& {
|
||||
return reversed ? range.second : range.first;
|
||||
};
|
||||
auto end_bound = [reversed] (const auto& range) -> const bound_view& {
|
||||
return reversed ? range.first : range.second;
|
||||
};
|
||||
clustering_key_prefix::equality eq(s);
|
||||
|
||||
auto it = ranges.begin();
|
||||
while (it != ranges.end()) {
|
||||
auto range = bound_view::from_range(*it);
|
||||
if (cmp(end_bound(range), lo) || eq(end_bound(range).prefix, lo)) {
|
||||
logger.trace("Remove ck range {}", *it);
|
||||
it = ranges.erase(it);
|
||||
continue;
|
||||
} else if (cmp(start_bound(range), lo)) {
|
||||
assert(cmp(lo, end_bound(range)));
|
||||
auto r = reversed ? range_type(it->start(), bound_type { lo, false })
|
||||
: range_type(bound_type { lo, false }, it->end());
|
||||
logger.trace("Modify ck range {} -> {}", *it, r);
|
||||
*it = std::move(r);
|
||||
}
|
||||
++it;
|
||||
}
|
||||
};
|
||||
|
||||
// last ck can be empty depending on whether we
|
||||
// deserialized state or not. This case means "last page ended on
|
||||
// something-not-bound-by-clustering" (i.e. a static row, alone)
|
||||
@@ -136,15 +178,7 @@ private:
|
||||
if (has_ck) {
|
||||
query::clustering_row_ranges row_ranges = _cmd->slice.default_row_ranges();
|
||||
clustering_key_prefix ckp = clustering_key_prefix::from_exploded(*_schema, _last_ckey->explode(*_schema));
|
||||
clustering_key_prefix::less_compare cmp_rt(*_schema);
|
||||
modify_ranges(row_ranges, ckp, false, [&cmp_rt](auto& c1, auto c2) {
|
||||
if (cmp_rt(c1, c2)) {
|
||||
return -1;
|
||||
} else if (cmp_rt(c2, c1)) {
|
||||
return 1;
|
||||
}
|
||||
return 0;
|
||||
});
|
||||
modify_ck_ranges(*_schema, row_ranges, ckp);
|
||||
|
||||
_cmd->slice.set_range(*_schema, *_last_pkey, row_ranges);
|
||||
}
|
||||
|
||||
@@ -2496,8 +2496,8 @@ storage_proxy::query_singular_local_digest(schema_ptr s, lw_shared_ptr<query::re
|
||||
future<foreign_ptr<lw_shared_ptr<query::result>>>
|
||||
storage_proxy::query_singular_local(schema_ptr s, lw_shared_ptr<query::read_command> cmd, const query::partition_range& pr, query::result_request request, tracing::trace_state_ptr trace_state) {
|
||||
unsigned shard = _db.local().shard_of(pr.start()->value().token());
|
||||
return _db.invoke_on(shard, [gs = global_schema_ptr(s), prv = std::vector<query::partition_range>({pr}) /* FIXME: pr is copied */, cmd, request, trace_state = std::move(trace_state)] (database& db) mutable {
|
||||
return db.query(gs, *cmd, request, prv, std::move(trace_state)).then([](auto&& f) {
|
||||
return _db.invoke_on(shard, [gs = global_schema_ptr(s), prv = std::vector<query::partition_range>({pr}) /* FIXME: pr is copied */, cmd, request, gt = tracing::global_trace_state_ptr(std::move(trace_state))] (database& db) mutable {
|
||||
return db.query(gs, *cmd, request, prv, gt).then([](auto&& f) {
|
||||
return make_foreign(std::move(f));
|
||||
});
|
||||
});
|
||||
@@ -2643,12 +2643,19 @@ storage_proxy::query_partition_key_range(lw_shared_ptr<query::read_command> cmd,
|
||||
}
|
||||
}
|
||||
|
||||
// estimate_result_rows_per_range() is currently broken, and this is not needed
|
||||
// when paging is available in any case
|
||||
#if 0
|
||||
// our estimate of how many result rows there will be per-range
|
||||
float result_rows_per_range = estimate_result_rows_per_range(cmd, ks);
|
||||
// underestimate how many rows we will get per-range in order to increase the likelihood that we'll
|
||||
// fetch enough rows in the first round
|
||||
result_rows_per_range -= result_rows_per_range * CONCURRENT_SUBREQUESTS_MARGIN;
|
||||
int concurrency_factor = result_rows_per_range == 0.0 ? 1 : std::max(1, std::min(int(ranges.size()), int(std::ceil(cmd->row_limit / result_rows_per_range))));
|
||||
#else
|
||||
int result_rows_per_range = 0;
|
||||
int concurrency_factor = 1;
|
||||
#endif
|
||||
|
||||
std::vector<foreign_ptr<lw_shared_ptr<query::result>>> results;
|
||||
results.reserve(ranges.size()/concurrency_factor + 1);
|
||||
@@ -3446,14 +3453,14 @@ future<foreign_ptr<lw_shared_ptr<reconcilable_result>>>
|
||||
storage_proxy::query_mutations_locally(schema_ptr s, lw_shared_ptr<query::read_command> cmd, const query::partition_range& pr, tracing::trace_state_ptr trace_state) {
|
||||
if (pr.is_singular()) {
|
||||
unsigned shard = _db.local().shard_of(pr.start()->value().token());
|
||||
return _db.invoke_on(shard, [cmd, &pr, gs = global_schema_ptr(s), trace_state = std::move(trace_state)] (database& db) mutable {
|
||||
return db.query_mutations(gs, *cmd, pr, std::move(trace_state)).then([] (reconcilable_result&& result) {
|
||||
return _db.invoke_on(shard, [cmd, &pr, gs=global_schema_ptr(s), gt = tracing::global_trace_state_ptr(std::move(trace_state))] (database& db) mutable {
|
||||
return db.query_mutations(gs, *cmd, pr, gt).then([] (reconcilable_result&& result) {
|
||||
return make_foreign(make_lw_shared(std::move(result)));
|
||||
});
|
||||
});
|
||||
} else {
|
||||
return _db.map_reduce(mutation_result_merger{cmd, s}, [cmd, &pr, gs = global_schema_ptr(s), trace_state = std::move(trace_state)] (database& db) {
|
||||
return db.query_mutations(gs, *cmd, pr, trace_state).then([] (reconcilable_result&& result) {
|
||||
return _db.map_reduce(mutation_result_merger{ cmd, s }, [cmd, &pr, gs=global_schema_ptr(s), gt = tracing::global_trace_state_ptr(std::move(trace_state))] (database& db) {
|
||||
return db.query_mutations(gs, *cmd, pr, gt).then([] (reconcilable_result&& result) {
|
||||
return make_foreign(make_lw_shared(std::move(result)));
|
||||
});
|
||||
}).then([] (reconcilable_result&& result) {
|
||||
|
||||
140
sstables/atomic_deletion.cc
Normal file
140
sstables/atomic_deletion.cc
Normal file
@@ -0,0 +1,140 @@
|
||||
/*
|
||||
* Copyright (C) 2016 ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* This file is part of Scylla.
|
||||
*
|
||||
* Scylla is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Affero General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* Scylla is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include "atomic_deletion.hh"
|
||||
#include "to_string.hh"
|
||||
#include <seastar/core/shared_future.hh>
|
||||
#include <boost/range/adaptor/transformed.hpp>
|
||||
#include <boost/range/algorithm/copy.hpp>
|
||||
|
||||
namespace sstables {
|
||||
|
||||
atomic_deletion_manager::atomic_deletion_manager(unsigned shard_count,
|
||||
std::function<future<> (std::vector<sstring> sstables)> delete_sstables)
|
||||
: _shard_count(shard_count)
|
||||
, _delete_sstables(std::move(delete_sstables)) {
|
||||
}
|
||||
|
||||
future<>
|
||||
atomic_deletion_manager::delete_atomically(std::vector<sstable_to_delete> atomic_deletion_set, unsigned deleting_shard) {
|
||||
// runs on shard 0 only
|
||||
_deletion_logger.debug("shard {} atomically deleting {}", deleting_shard, atomic_deletion_set);
|
||||
|
||||
if (_atomic_deletions_cancelled) {
|
||||
_deletion_logger.debug("atomic deletions disabled, erroring out");
|
||||
using boost::adaptors::transformed;
|
||||
throw atomic_deletion_cancelled(atomic_deletion_set
|
||||
| transformed(std::mem_fn(&sstable_to_delete::name)));
|
||||
}
|
||||
|
||||
// Insert atomic_deletion_set into the list of sets pending deletion. If the new set
|
||||
// overlaps with an existing set, merge them (the merged set will be deleted atomically).
|
||||
std::unordered_map<sstring, lw_shared_ptr<pending_deletion>> new_atomic_deletion_sets;
|
||||
auto merged_set = make_lw_shared(pending_deletion());
|
||||
for (auto&& sst_to_delete : atomic_deletion_set) {
|
||||
merged_set->names.insert(sst_to_delete.name);
|
||||
if (!sst_to_delete.shared) {
|
||||
for (auto shard : boost::irange<shard_id>(0, _shard_count)) {
|
||||
_shards_agreeing_to_delete_sstable[sst_to_delete.name].insert(shard);
|
||||
}
|
||||
}
|
||||
new_atomic_deletion_sets.emplace(sst_to_delete.name, merged_set);
|
||||
}
|
||||
auto pr = make_lw_shared<promise<>>();
|
||||
merged_set->completions.insert(pr);
|
||||
auto ret = pr->get_future();
|
||||
for (auto&& sst_to_delete : atomic_deletion_set) {
|
||||
auto i = _atomic_deletion_sets.find(sst_to_delete.name);
|
||||
// merge from old deletion set to new deletion set
|
||||
// i->second can be nullptr, see below why
|
||||
if (i != _atomic_deletion_sets.end() && i->second) {
|
||||
boost::copy(i->second->names, std::inserter(merged_set->names, merged_set->names.end()));
|
||||
boost::copy(i->second->completions, std::inserter(merged_set->completions, merged_set->completions.end()));
|
||||
}
|
||||
}
|
||||
_deletion_logger.debug("new atomic set: {}", merged_set->names);
|
||||
// we need to merge new_atomic_deletion_sets into g_atomic_deletion_sets,
|
||||
// but beware of exceptions. We do that with a first pass that inserts
|
||||
// nullptr as the value, so the second pass only replaces, and does not allocate
|
||||
for (auto&& sst_to_delete : atomic_deletion_set) {
|
||||
_atomic_deletion_sets.emplace(sst_to_delete.name, nullptr);
|
||||
}
|
||||
// now, no allocations are involved, so this commits the operation atomically
|
||||
for (auto&& n : merged_set->names) {
|
||||
auto i = _atomic_deletion_sets.find(n);
|
||||
i->second = merged_set;
|
||||
}
|
||||
|
||||
// Mark each sstable as being deleted from deleting_shard. We have to do
|
||||
// this in a separate pass, so the consideration whether we can delete or not
|
||||
// sees all the data from this pass.
|
||||
for (auto&& sst : atomic_deletion_set) {
|
||||
_shards_agreeing_to_delete_sstable[sst.name].insert(deleting_shard);
|
||||
}
|
||||
|
||||
// Figure out if the (possibly merged) set can be deleted
|
||||
for (auto&& sst : merged_set->names) {
|
||||
if (_shards_agreeing_to_delete_sstable[sst].size() != _shard_count) {
|
||||
// Not everyone agrees, leave the set pending
|
||||
_deletion_logger.debug("deferring deletion until all shards agree");
|
||||
return ret;
|
||||
}
|
||||
}
|
||||
|
||||
// Cannot recover from a failed deletion
|
||||
for (auto&& name : merged_set->names) {
|
||||
_atomic_deletion_sets.erase(name);
|
||||
_shards_agreeing_to_delete_sstable.erase(name);
|
||||
}
|
||||
|
||||
// Everyone agrees, let's delete
|
||||
auto names = boost::copy_range<std::vector<sstring>>(merged_set->names);
|
||||
_deletion_logger.debug("deleting {}", names);
|
||||
return _delete_sstables(names).then_wrapped([this, merged_set] (future<> result) {
|
||||
_deletion_logger.debug("atomic deletion completed: {}", merged_set->names);
|
||||
shared_future<> sf(std::move(result));
|
||||
for (auto&& comp : merged_set->completions) {
|
||||
sf.get_future().forward_to(std::move(*comp));
|
||||
}
|
||||
});
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
void
|
||||
atomic_deletion_manager::cancel_atomic_deletions() {
|
||||
_atomic_deletions_cancelled = true;
|
||||
for (auto&& pd : _atomic_deletion_sets) {
|
||||
if (!pd.second) {
|
||||
// Could happen if a delete_atomically() failed
|
||||
continue;
|
||||
}
|
||||
for (auto&& c : pd.second->completions) {
|
||||
c->set_exception(atomic_deletion_cancelled(pd.second->names));
|
||||
}
|
||||
// since sets are shared, make sure we don't hit the same one again
|
||||
pd.second->completions.clear();
|
||||
}
|
||||
_atomic_deletion_sets.clear();
|
||||
_shards_agreeing_to_delete_sstable.clear();
|
||||
}
|
||||
|
||||
}
|
||||
92
sstables/atomic_deletion.hh
Normal file
92
sstables/atomic_deletion.hh
Normal file
@@ -0,0 +1,92 @@
|
||||
/*
|
||||
* Copyright (C) 2016 ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* This file is part of Scylla.
|
||||
*
|
||||
* Scylla is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Affero General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* Scylla is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
// The atomic deletion manager solves the problem of orchestrating
|
||||
// the deletion of files that must be deleted as a group, where each
|
||||
// shard has different groups, and all shards delete a file for it to
|
||||
// be deleted. For example,
|
||||
//
|
||||
// shard 0: delete "A"
|
||||
// we can't delete anything because shard 1 hasn't agreed yet.
|
||||
// shard 1: delete "A" and B"
|
||||
// shard 1 agrees to delete "A", but we can't delete it yet,
|
||||
// because shard 1 requires that it be deleted together with "B",
|
||||
// and shard 0 hasn't agreed to delete "B" yet.
|
||||
// shard 0: delete "B" and "C"
|
||||
// shards 0 and 1 now both agree to delete "A" and "B", but shard 0
|
||||
// doesn't allow us to delete "B" without "C".
|
||||
// shard 1: delete "C"
|
||||
// finally, we can delete "A", "B", and "C".
|
||||
|
||||
#include "log.hh"
|
||||
#include <seastar/core/future.hh>
|
||||
#include <seastar/core/future-util.hh>
|
||||
#include <seastar/core/shared_ptr.hh>
|
||||
#include <seastar/core/sstring.hh>
|
||||
#include <seastar/core/reactor.hh> // for shard_id
|
||||
#include <unordered_set>
|
||||
#include <unordered_map>
|
||||
#include <vector>
|
||||
|
||||
namespace sstables {
|
||||
|
||||
struct sstable_to_delete {
|
||||
sstable_to_delete(sstring name, bool shared) : name(std::move(name)), shared(shared) {}
|
||||
sstring name;
|
||||
bool shared = false;
|
||||
friend std::ostream& operator<<(std::ostream& os, const sstable_to_delete& std);
|
||||
};
|
||||
|
||||
class atomic_deletion_cancelled : public std::exception {
|
||||
std::string _msg;
|
||||
public:
|
||||
explicit atomic_deletion_cancelled(std::vector<sstring> names);
|
||||
template <typename StringRange>
|
||||
explicit atomic_deletion_cancelled(StringRange range)
|
||||
: atomic_deletion_cancelled(std::vector<sstring>{range.begin(), range.end()}) {
|
||||
}
|
||||
const char* what() const noexcept override;
|
||||
};
|
||||
|
||||
class atomic_deletion_manager {
|
||||
logging::logger _deletion_logger{"sstable-deletion"};
|
||||
using shards_agreeing_to_delete_sstable_type = std::unordered_set<shard_id>;
|
||||
using sstables_to_delete_atomically_type = std::set<sstring>;
|
||||
struct pending_deletion {
|
||||
sstables_to_delete_atomically_type names;
|
||||
std::unordered_set<lw_shared_ptr<promise<>>> completions;
|
||||
};
|
||||
bool _atomic_deletions_cancelled = false;
|
||||
// map from sstable name to a set of sstables that must be deleted atomically, including itself
|
||||
std::unordered_map<sstring, lw_shared_ptr<pending_deletion>> _atomic_deletion_sets;
|
||||
std::unordered_map<sstring, shards_agreeing_to_delete_sstable_type> _shards_agreeing_to_delete_sstable;
|
||||
unsigned _shard_count;
|
||||
std::function<future<> (std::vector<sstring> sstables)> _delete_sstables;
|
||||
public:
|
||||
atomic_deletion_manager(unsigned shard_count,
|
||||
std::function<future<> (std::vector<sstring> sstables)> delete_sstables);
|
||||
future<> delete_atomically(std::vector<sstable_to_delete> atomic_deletion_set, unsigned deleting_shard);
|
||||
void cancel_atomic_deletions();
|
||||
};
|
||||
|
||||
}
|
||||
@@ -216,6 +216,7 @@ protected:
|
||||
public:
|
||||
virtual ~compaction_strategy_impl() {}
|
||||
virtual compaction_descriptor get_sstables_for_compaction(column_family& cfs, std::vector<sstables::shared_sstable> candidates) = 0;
|
||||
virtual void notify_completion(const std::vector<lw_shared_ptr<sstable>>& removed, const std::vector<lw_shared_ptr<sstable>>& added) { }
|
||||
virtual compaction_strategy_type type() const = 0;
|
||||
virtual bool parallel_compaction() const {
|
||||
return true;
|
||||
@@ -583,6 +584,8 @@ class leveled_compaction_strategy : public compaction_strategy_impl {
|
||||
const sstring SSTABLE_SIZE_OPTION = "sstable_size_in_mb";
|
||||
|
||||
int32_t _max_sstable_size_in_mb = DEFAULT_MAX_SSTABLE_SIZE_IN_MB;
|
||||
std::vector<stdx::optional<dht::decorated_key>> _last_compacted_keys;
|
||||
std::vector<int> _compaction_counter;
|
||||
public:
|
||||
leveled_compaction_strategy(const std::map<sstring, sstring>& options) {
|
||||
using namespace cql3::statements;
|
||||
@@ -596,10 +599,14 @@ public:
|
||||
logger.warn("Max sstable size of {}MB is configured. Testing done for CASSANDRA-5727 indicates that performance improves up to 160MB",
|
||||
_max_sstable_size_in_mb);
|
||||
}
|
||||
_last_compacted_keys.resize(leveled_manifest::MAX_LEVELS);
|
||||
_compaction_counter.resize(leveled_manifest::MAX_LEVELS);
|
||||
}
|
||||
|
||||
virtual compaction_descriptor get_sstables_for_compaction(column_family& cfs, std::vector<sstables::shared_sstable> candidates) override;
|
||||
|
||||
virtual void notify_completion(const std::vector<lw_shared_ptr<sstable>>& removed, const std::vector<lw_shared_ptr<sstable>>& added) override;
|
||||
|
||||
virtual int64_t estimated_pending_compactions(column_family& cf) const override;
|
||||
|
||||
virtual bool parallel_compaction() const override {
|
||||
@@ -621,7 +628,7 @@ compaction_descriptor leveled_compaction_strategy::get_sstables_for_compaction(c
|
||||
// sstable in it may be marked for deletion after compacted.
|
||||
// Currently, we create a new manifest whenever it's time for compaction.
|
||||
leveled_manifest manifest = leveled_manifest::create(cfs, candidates, _max_sstable_size_in_mb);
|
||||
auto candidate = manifest.get_compaction_candidates();
|
||||
auto candidate = manifest.get_compaction_candidates(_last_compacted_keys, _compaction_counter);
|
||||
|
||||
if (candidate.sstables.empty()) {
|
||||
return sstables::compaction_descriptor();
|
||||
@@ -632,6 +639,24 @@ compaction_descriptor leveled_compaction_strategy::get_sstables_for_compaction(c
|
||||
return std::move(candidate);
|
||||
}
|
||||
|
||||
void leveled_compaction_strategy::notify_completion(const std::vector<lw_shared_ptr<sstable>>& removed, const std::vector<lw_shared_ptr<sstable>>& added) {
|
||||
if (removed.empty() || added.empty()) {
|
||||
return;
|
||||
}
|
||||
auto min_level = std::numeric_limits<uint32_t>::max();
|
||||
for (auto& sstable : removed) {
|
||||
min_level = std::min(min_level, sstable->get_sstable_level());
|
||||
}
|
||||
|
||||
const sstables::sstable *last = nullptr;
|
||||
for (auto& candidate : added) {
|
||||
if (!last || last->compare_by_first_key(*candidate) < 0) {
|
||||
last = &*candidate;
|
||||
}
|
||||
}
|
||||
_last_compacted_keys[min_level] = last->get_last_decorated_key();
|
||||
}
|
||||
|
||||
int64_t leveled_compaction_strategy::estimated_pending_compactions(column_family& cf) const {
|
||||
std::vector<sstables::shared_sstable> sstables;
|
||||
sstables.reserve(cf.sstables_count());
|
||||
@@ -686,6 +711,10 @@ compaction_descriptor compaction_strategy::get_sstables_for_compaction(column_fa
|
||||
return _compaction_strategy_impl->get_sstables_for_compaction(cfs, std::move(candidates));
|
||||
}
|
||||
|
||||
void compaction_strategy::notify_completion(const std::vector<lw_shared_ptr<sstable>>& removed, const std::vector<lw_shared_ptr<sstable>>& added) {
|
||||
_compaction_strategy_impl->notify_completion(removed, added);
|
||||
}
|
||||
|
||||
bool compaction_strategy::parallel_compaction() const {
|
||||
return _compaction_strategy_impl->parallel_compaction();
|
||||
}
|
||||
|
||||
@@ -121,7 +121,11 @@ size_t compress_lz4(const char* input, size_t input_len,
|
||||
output[1] = (input_len >> 8) & 0xFF;
|
||||
output[2] = (input_len >> 16) & 0xFF;
|
||||
output[3] = (input_len >> 24) & 0xFF;
|
||||
#ifdef HAVE_LZ4_COMPRESS_DEFAULT
|
||||
auto ret = LZ4_compress_default(input, output + 4, input_len, LZ4_compressBound(input_len));
|
||||
#else
|
||||
auto ret = LZ4_compress(input, output + 4, input_len);
|
||||
#endif
|
||||
if (ret == 0) {
|
||||
throw std::runtime_error("LZ4 compression failure: LZ4_compress() failed");
|
||||
}
|
||||
|
||||
@@ -37,7 +37,7 @@ public:
|
||||
bool should_continue() {
|
||||
return indexes.size() < max_quantity;
|
||||
}
|
||||
void consume_entry(index_entry&& ie) {
|
||||
void consume_entry(index_entry&& ie, uint64_t offset) {
|
||||
indexes.push_back(std::move(ie));
|
||||
}
|
||||
};
|
||||
@@ -45,13 +45,14 @@ public:
|
||||
// IndexConsumer is a concept that implements:
|
||||
//
|
||||
// bool should_continue();
|
||||
// void consume_entry(index_entry&& ie);
|
||||
// void consume_entry(index_entry&& ie, uintt64_t offset);
|
||||
template <class IndexConsumer>
|
||||
class index_consume_entry_context: public data_consumer::continuous_data_consumer<index_consume_entry_context<IndexConsumer>> {
|
||||
using proceed = data_consumer::proceed;
|
||||
using continuous_data_consumer = data_consumer::continuous_data_consumer<index_consume_entry_context<IndexConsumer>>;
|
||||
private:
|
||||
IndexConsumer& _consumer;
|
||||
uint64_t _entry_offset;
|
||||
|
||||
enum class state {
|
||||
START,
|
||||
@@ -109,9 +110,12 @@ public:
|
||||
_state = state::CONSUME_ENTRY;
|
||||
break;
|
||||
}
|
||||
case state::CONSUME_ENTRY:
|
||||
_consumer.consume_entry(index_entry(std::move(_key), this->_u64, std::move(_promoted)));
|
||||
case state::CONSUME_ENTRY: {
|
||||
auto len = (_key.size() + _promoted.size() + 14);
|
||||
_consumer.consume_entry(index_entry(std::move(_key), this->_u64, std::move(_promoted)), _entry_offset);
|
||||
_entry_offset += len;
|
||||
_state = state::START;
|
||||
}
|
||||
break;
|
||||
default:
|
||||
throw malformed_sstable_exception("unknown state");
|
||||
@@ -120,10 +124,9 @@ public:
|
||||
}
|
||||
|
||||
index_consume_entry_context(IndexConsumer& consumer,
|
||||
input_stream<char>&& input, uint64_t maxlen)
|
||||
input_stream<char>&& input, uint64_t start, uint64_t maxlen)
|
||||
: continuous_data_consumer(std::move(input), maxlen)
|
||||
, _consumer(consumer)
|
||||
, _consumer(consumer), _entry_offset(start)
|
||||
{}
|
||||
|
||||
};
|
||||
}
|
||||
|
||||
@@ -64,16 +64,14 @@ class leveled_manifest {
|
||||
|
||||
schema_ptr _schema;
|
||||
std::vector<std::list<sstables::shared_sstable>> _generations;
|
||||
#if 0
|
||||
private final RowPosition[] lastCompactedKeys;
|
||||
#endif
|
||||
uint64_t _max_sstable_size_in_bytes;
|
||||
#if 0
|
||||
private final SizeTieredCompactionStrategyOptions options;
|
||||
private final int [] compactionCounter;
|
||||
#endif
|
||||
|
||||
public:
|
||||
static constexpr int MAX_LEVELS = 9; // log10(1000^3);
|
||||
|
||||
leveled_manifest(column_family& cfs, int max_sstable_size_in_MB)
|
||||
: logger("LeveledManifest")
|
||||
, _schema(cfs.schema())
|
||||
@@ -82,15 +80,8 @@ public:
|
||||
// allocate enough generations for a PB of data, with a 1-MB sstable size. (Note that if maxSSTableSize is
|
||||
// updated, we will still have sstables of the older, potentially smaller size. So don't make this
|
||||
// dependent on maxSSTableSize.)
|
||||
uint64_t n = 9; // log10(1000^3)
|
||||
_generations.resize(n);
|
||||
_generations.resize(MAX_LEVELS);
|
||||
#if 0
|
||||
lastCompactedKeys = new RowPosition[n];
|
||||
for (int i = 0; i < generations.length; i++)
|
||||
{
|
||||
generations[i] = new ArrayList<>();
|
||||
lastCompactedKeys[i] = cfs.partitioner.getMinimumToken().minKeyBound();
|
||||
}
|
||||
compactionCounter = new int[n];
|
||||
#endif
|
||||
}
|
||||
@@ -129,37 +120,6 @@ public:
|
||||
_generations[level].push_back(sstable);
|
||||
}
|
||||
|
||||
#if 0
|
||||
public synchronized void replace(Collection<SSTableReader> removed, Collection<SSTableReader> added)
|
||||
{
|
||||
assert !removed.isEmpty(); // use add() instead of promote when adding new sstables
|
||||
logDistribution();
|
||||
if (logger.isDebugEnabled())
|
||||
logger.debug("Replacing [{}]", toString(removed));
|
||||
|
||||
// the level for the added sstables is the max of the removed ones,
|
||||
// plus one if the removed were all on the same level
|
||||
int minLevel = Integer.MAX_VALUE;
|
||||
|
||||
for (SSTableReader sstable : removed)
|
||||
{
|
||||
int thisLevel = remove(sstable);
|
||||
minLevel = Math.min(minLevel, thisLevel);
|
||||
}
|
||||
|
||||
// it's valid to do a remove w/o an add (e.g. on truncate)
|
||||
if (added.isEmpty())
|
||||
return;
|
||||
|
||||
if (logger.isDebugEnabled())
|
||||
logger.debug("Adding [{}]", toString(added));
|
||||
|
||||
for (SSTableReader ssTableReader : added)
|
||||
add(ssTableReader);
|
||||
lastCompactedKeys[minLevel] = SSTableReader.sstableOrdering.max(added).last;
|
||||
}
|
||||
#endif
|
||||
|
||||
void repair_overlapping_sstables(int level) {
|
||||
const sstables::sstable *previous = nullptr;
|
||||
const schema& s = *_schema;
|
||||
@@ -272,7 +232,8 @@ public:
|
||||
* @return highest-priority sstables to compact, and level to compact them to
|
||||
* If no compactions are necessary, will return null
|
||||
*/
|
||||
sstables::compaction_descriptor get_compaction_candidates() {
|
||||
sstables::compaction_descriptor get_compaction_candidates(const std::vector<stdx::optional<dht::decorated_key>>& last_compacted_keys,
|
||||
std::vector<int>& compaction_counter) {
|
||||
#if 0
|
||||
// during bootstrap we only do size tiering in L0 to make sure
|
||||
// the streamed files can be placed in their original levels
|
||||
@@ -339,11 +300,12 @@ public:
|
||||
}
|
||||
}
|
||||
// L0 is fine, proceed with this level
|
||||
auto candidates = get_candidates_for(i);
|
||||
auto candidates = get_candidates_for(i, last_compacted_keys);
|
||||
if (!candidates.empty()) {
|
||||
int next_level = get_next_level(candidates);
|
||||
|
||||
candidates = get_overlapping_starved_sstables(next_level, std::move(candidates), compaction_counter);
|
||||
#if 0
|
||||
candidates = getOverlappingStarvedSSTables(nextLevel, candidates);
|
||||
if (logger.isDebugEnabled())
|
||||
logger.debug("Compaction candidates for L{} are {}", i, toString(candidates));
|
||||
#endif
|
||||
@@ -359,7 +321,7 @@ public:
|
||||
if (get_level(0).empty()) {
|
||||
return sstables::compaction_descriptor();
|
||||
}
|
||||
auto candidates = get_candidates_for(0);
|
||||
auto candidates = get_candidates_for(0, last_compacted_keys);
|
||||
if (candidates.empty()) {
|
||||
return sstables::compaction_descriptor();
|
||||
}
|
||||
@@ -391,49 +353,57 @@ public:
|
||||
* @param candidates the original sstables to compact
|
||||
* @return
|
||||
*/
|
||||
#if 0
|
||||
private Collection<SSTableReader> getOverlappingStarvedSSTables(int targetLevel, Collection<SSTableReader> candidates)
|
||||
{
|
||||
Set<SSTableReader> withStarvedCandidate = new HashSet<>(candidates);
|
||||
std::vector<sstables::shared_sstable>
|
||||
get_overlapping_starved_sstables(int target_level, std::vector<sstables::shared_sstable>&& candidates, std::vector<int>& compaction_counter) {
|
||||
for (int i = _generations.size() - 1; i > 0; i--) {
|
||||
compaction_counter[i]++;
|
||||
}
|
||||
compaction_counter[target_level] = 0;
|
||||
|
||||
for (int i = generations.length - 1; i > 0; i--)
|
||||
compactionCounter[i]++;
|
||||
compactionCounter[targetLevel] = 0;
|
||||
if (logger.isDebugEnabled())
|
||||
{
|
||||
for (int j = 0; j < compactionCounter.length; j++)
|
||||
logger.debug("CompactionCounter: {}: {}", j, compactionCounter[j]);
|
||||
if (logger.level() == logging::log_level::debug) {
|
||||
for (auto j = 0U; j < compaction_counter.size(); j++) {
|
||||
logger.debug("CompactionCounter: {}: {}", j, compaction_counter[j]);
|
||||
}
|
||||
}
|
||||
|
||||
for (int i = generations.length - 1; i > 0; i--)
|
||||
{
|
||||
if (getLevelSize(i) > 0)
|
||||
{
|
||||
if (compactionCounter[i] > NO_COMPACTION_LIMIT)
|
||||
{
|
||||
for (int i = _generations.size() - 1; i > 0; i--) {
|
||||
if (get_level_size(i) > 0) {
|
||||
if (compaction_counter[i] > NO_COMPACTION_LIMIT) {
|
||||
// we try to find an sstable that is fully contained within the boundaries we are compacting;
|
||||
// say we are compacting 3 sstables: 0->30 in L1 and 0->12, 12->33 in L2
|
||||
// this means that we will not create overlap in L2 if we add an sstable
|
||||
// contained within 0 -> 33 to the compaction
|
||||
RowPosition max = null;
|
||||
RowPosition min = null;
|
||||
for (SSTableReader candidate : candidates)
|
||||
{
|
||||
if (min == null || candidate.first.compareTo(min) < 0)
|
||||
min = candidate.first;
|
||||
if (max == null || candidate.last.compareTo(max) > 0)
|
||||
max = candidate.last;
|
||||
stdx::optional<dht::decorated_key> max;
|
||||
stdx::optional<dht::decorated_key> min;
|
||||
for (auto& candidate : candidates) {
|
||||
auto& candidate_first = candidate->get_first_decorated_key();
|
||||
if (!min || candidate_first.tri_compare(*_schema, *min) < 0) {
|
||||
min = candidate_first;
|
||||
}
|
||||
auto& candidate_last = candidate->get_first_decorated_key();
|
||||
if (!max || candidate_last.tri_compare(*_schema, *max) > 0) {
|
||||
max = candidate_last;
|
||||
}
|
||||
}
|
||||
#if 0
|
||||
// NOTE: We don't need to filter out compacting sstables by now because strategy only deals with
|
||||
// uncompacting sstables and parallel compaction is also disabled for lcs.
|
||||
Set<SSTableReader> compacting = cfs.getDataTracker().getCompacting();
|
||||
Range<RowPosition> boundaries = new Range<>(min, max);
|
||||
for (SSTableReader sstable : getLevel(i))
|
||||
{
|
||||
Range<RowPosition> r = new Range<RowPosition>(sstable.first, sstable.last);
|
||||
if (boundaries.contains(r) && !compacting.contains(sstable))
|
||||
{
|
||||
logger.info("Adding high-level (L{}) {} to candidates", sstable.getSSTableLevel(), sstable);
|
||||
withStarvedCandidate.add(sstable);
|
||||
return withStarvedCandidate;
|
||||
#endif
|
||||
auto boundaries = ::range<dht::decorated_key>::make(*min, *max);
|
||||
for (auto& sstable : get_level(i)) {
|
||||
auto r = ::range<dht::decorated_key>::make(sstable->get_first_decorated_key(), sstable->get_last_decorated_key());
|
||||
if (boundaries.contains(r, dht::ring_position_comparator(*_schema))) {
|
||||
logger.info("Adding high-level (L{}) {} to candidates", sstable->get_sstable_level(), sstable->get_filename());
|
||||
|
||||
auto result = std::find_if(std::begin(candidates), std::end(candidates), [&sstable] (auto& candidate) {
|
||||
return sstable->generation() == candidate->generation();
|
||||
});
|
||||
if (result != std::end(candidates)) {
|
||||
continue;
|
||||
}
|
||||
candidates.push_back(sstable);
|
||||
return candidates;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -443,7 +413,6 @@ public:
|
||||
|
||||
return candidates;
|
||||
}
|
||||
#endif
|
||||
|
||||
size_t get_level_size(uint32_t level) {
|
||||
#if 0
|
||||
@@ -557,7 +526,7 @@ public:
|
||||
* If no compactions are possible (because of concurrent compactions or because some sstables are blacklisted
|
||||
* for prior failure), will return an empty list. Never returns null.
|
||||
*/
|
||||
std::vector<sstables::shared_sstable> get_candidates_for(int level) {
|
||||
std::vector<sstables::shared_sstable> get_candidates_for(int level, const std::vector<stdx::optional<dht::decorated_key>>& last_compacted_keys) {
|
||||
const schema& s = *_schema;
|
||||
assert(!get_level(level).empty());
|
||||
|
||||
@@ -657,31 +626,35 @@ public:
|
||||
}
|
||||
|
||||
// for non-L0 compactions, pick up where we left off last time
|
||||
get_level(level).sort([&s] (auto& i, auto& j) {
|
||||
std::list<sstables::shared_sstable>& sstables = get_level(level);
|
||||
sstables.sort([&s] (auto& i, auto& j) {
|
||||
return i->compare_by_first_key(*j) < 0;
|
||||
});
|
||||
int start = 0; // handles case where the prior compaction touched the very last range
|
||||
#if 0
|
||||
for (int i = 0; i < getLevel(level).size(); i++)
|
||||
{
|
||||
SSTableReader sstable = getLevel(level).get(i);
|
||||
if (sstable.first.compareTo(lastCompactedKeys[level]) > 0)
|
||||
{
|
||||
start = i;
|
||||
int idx = 0;
|
||||
for (auto& sstable : sstables) {
|
||||
if (uint32_t(level) >= last_compacted_keys.size()) {
|
||||
throw std::runtime_error(sprint("Invalid level %u out of %ld", level, (last_compacted_keys.size() - 1)));
|
||||
}
|
||||
auto& sstable_first = sstable->get_first_decorated_key();
|
||||
if (!last_compacted_keys[level] || sstable_first.tri_compare(s, *last_compacted_keys[level]) > 0) {
|
||||
start = idx;
|
||||
break;
|
||||
}
|
||||
idx++;
|
||||
}
|
||||
#endif
|
||||
|
||||
// look for a non-suspect keyspace to compact with, starting with where we left off last time,
|
||||
// and wrapping back to the beginning of the generation if necessary
|
||||
for (auto i = 0U; i < get_level(level).size(); i++) {
|
||||
for (auto i = 0U; i < sstables.size(); i++) {
|
||||
// get an iterator to the element of position pos from the list get_level(level).
|
||||
auto pos = (start + i) % get_level(level).size();
|
||||
auto it = get_level(level).begin();
|
||||
auto pos = (start + i) % sstables.size();
|
||||
auto it = sstables.begin();
|
||||
std::advance(it, pos);
|
||||
|
||||
auto sstable = *it;
|
||||
auto& sstable = *it;
|
||||
auto candidates = overlapping(*_schema, sstable, get_level(level + 1));
|
||||
|
||||
candidates.push_back(sstable);
|
||||
#if 0
|
||||
if (Iterables.any(candidates, suspectP))
|
||||
|
||||
@@ -902,7 +902,7 @@ future<index_list> sstable::read_indexes(uint64_t summary_idx, const io_priority
|
||||
auto stream = make_file_input_stream(this->_index_file, position, end - position, std::move(options));
|
||||
// TODO: it's redundant to constrain the consumer here to stop at
|
||||
// index_size()-position, the input stream is already constrained.
|
||||
auto ctx = make_lw_shared<index_consume_entry_context<index_consumer>>(ic, std::move(stream), this->index_size() - position);
|
||||
auto ctx = make_lw_shared<index_consume_entry_context<index_consumer>>(ic, std::move(stream), position, this->index_size() - position);
|
||||
return ctx->consume_input(*ctx).finally([ctx] {
|
||||
return ctx->close();
|
||||
}).then([ctx, &ic] {
|
||||
@@ -1887,8 +1887,8 @@ future<> sstable::generate_summary(const io_priority_class& pc) {
|
||||
bool should_continue() {
|
||||
return true;
|
||||
}
|
||||
void consume_entry(index_entry&& ie) {
|
||||
maybe_add_summary_entry(_summary, ie.get_key_bytes(), ie.position());
|
||||
void consume_entry(index_entry&& ie, uint64_t offset) {
|
||||
maybe_add_summary_entry(_summary, ie.get_key_bytes(), offset);
|
||||
if (!first_key) {
|
||||
first_key = key(to_bytes(ie.get_key_bytes()));
|
||||
} else {
|
||||
@@ -1911,7 +1911,7 @@ future<> sstable::generate_summary(const io_priority_class& pc) {
|
||||
options.io_priority_class = pc;
|
||||
auto stream = make_file_input_stream(index_file, 0, size, std::move(options));
|
||||
return do_with(summary_generator(_summary), [this, &pc, stream = std::move(stream), size] (summary_generator& s) mutable {
|
||||
auto ctx = make_lw_shared<index_consume_entry_context<summary_generator>>(s, std::move(stream), size);
|
||||
auto ctx = make_lw_shared<index_consume_entry_context<summary_generator>>(s, std::move(stream), 0, size);
|
||||
return ctx->consume_input(*ctx).finally([ctx] {
|
||||
return ctx->close();
|
||||
}).then([this, ctx, &s] {
|
||||
@@ -2259,8 +2259,11 @@ remove_by_toc_name(sstring sstable_toc_name) {
|
||||
dir = dirname(sstable_toc_name);
|
||||
sstable_write_io_check(rename_file, sstable_toc_name, new_toc_name).get();
|
||||
sstable_write_io_check(fsync_directory, dir).get();
|
||||
} else {
|
||||
} else if (sstable_write_io_check(file_exists, new_toc_name).get0()) {
|
||||
dir = dirname(new_toc_name);
|
||||
} else {
|
||||
sstlog.warn("Unable to delete {} because it doesn't exist.", sstable_toc_name);
|
||||
return;
|
||||
}
|
||||
|
||||
auto toc_file = open_checked_file_dma(sstable_read_error, new_toc_name, open_flags::ro).get0();
|
||||
@@ -2392,107 +2395,21 @@ operator<<(std::ostream& os, const sstable_to_delete& std) {
|
||||
return os << std.name << "(" << (std.shared ? "shared" : "unshared") << ")";
|
||||
}
|
||||
|
||||
using shards_agreeing_to_delete_sstable_type = std::unordered_set<shard_id>;
|
||||
using sstables_to_delete_atomically_type = std::set<sstring>;
|
||||
struct pending_deletion {
|
||||
sstables_to_delete_atomically_type names;
|
||||
std::vector<lw_shared_ptr<promise<>>> completions;
|
||||
};
|
||||
|
||||
static thread_local bool g_atomic_deletions_cancelled = false;
|
||||
static thread_local std::list<lw_shared_ptr<pending_deletion>> g_atomic_deletion_sets;
|
||||
static thread_local std::unordered_map<sstring, shards_agreeing_to_delete_sstable_type> g_shards_agreeing_to_delete_sstable;
|
||||
|
||||
static logging::logger deletion_logger("sstable-deletion");
|
||||
|
||||
static
|
||||
future<>
|
||||
do_delete_atomically(std::vector<sstable_to_delete> atomic_deletion_set, unsigned deleting_shard) {
|
||||
// runs on shard 0 only
|
||||
deletion_logger.debug("shard {} atomically deleting {}", deleting_shard, atomic_deletion_set);
|
||||
|
||||
if (g_atomic_deletions_cancelled) {
|
||||
deletion_logger.debug("atomic deletions disabled, erroring out");
|
||||
using boost::adaptors::transformed;
|
||||
throw atomic_deletion_cancelled(atomic_deletion_set
|
||||
| transformed(std::mem_fn(&sstable_to_delete::name)));
|
||||
}
|
||||
|
||||
// Insert atomic_deletion_set into the list of sets pending deletion. If the new set
|
||||
// overlaps with an existing set, merge them (the merged set will be deleted atomically).
|
||||
std::list<lw_shared_ptr<pending_deletion>> new_atomic_deletion_sets;
|
||||
auto merged_set = make_lw_shared(pending_deletion());
|
||||
for (auto&& sst_to_delete : atomic_deletion_set) {
|
||||
merged_set->names.insert(sst_to_delete.name);
|
||||
if (!sst_to_delete.shared) {
|
||||
for (auto shard : boost::irange<shard_id>(0, smp::count)) {
|
||||
g_shards_agreeing_to_delete_sstable[sst_to_delete.name].insert(shard);
|
||||
}
|
||||
}
|
||||
}
|
||||
merged_set->completions.push_back(make_lw_shared<promise<>>());
|
||||
auto ret = merged_set->completions.back()->get_future();
|
||||
for (auto&& old_set : g_atomic_deletion_sets) {
|
||||
auto intersection = sstables_to_delete_atomically_type();
|
||||
boost::set_intersection(merged_set->names, old_set->names, std::inserter(intersection, intersection.end()));
|
||||
if (intersection.empty()) {
|
||||
// We copy old_set to avoid corrupting g_atomic_deletion_sets if we fail
|
||||
// further on.
|
||||
new_atomic_deletion_sets.push_back(old_set);
|
||||
} else {
|
||||
deletion_logger.debug("merging with {}", old_set->names);
|
||||
merged_set->names.insert(old_set->names.begin(), old_set->names.end());
|
||||
boost::push_back(merged_set->completions, old_set->completions);
|
||||
}
|
||||
}
|
||||
deletion_logger.debug("new atomic set: {}", merged_set->names);
|
||||
new_atomic_deletion_sets.push_back(merged_set);
|
||||
// can now exception-safely commit:
|
||||
g_atomic_deletion_sets = std::move(new_atomic_deletion_sets);
|
||||
|
||||
// Mark each sstable as being deleted from deleting_shard. We have to do
|
||||
// this in a separate pass, so the consideration whether we can delete or not
|
||||
// sees all the data from this pass.
|
||||
for (auto&& sst : atomic_deletion_set) {
|
||||
g_shards_agreeing_to_delete_sstable[sst.name].insert(deleting_shard);
|
||||
}
|
||||
|
||||
// Figure out if the (possibly merged) set can be deleted
|
||||
for (auto&& sst : merged_set->names) {
|
||||
if (g_shards_agreeing_to_delete_sstable[sst].size() != smp::count) {
|
||||
// Not everyone agrees, leave the set pending
|
||||
deletion_logger.debug("deferring deletion until all shards agree");
|
||||
return ret;
|
||||
}
|
||||
}
|
||||
|
||||
// Cannot recover from a failed deletion
|
||||
g_atomic_deletion_sets.pop_back();
|
||||
for (auto&& name : merged_set->names) {
|
||||
g_shards_agreeing_to_delete_sstable.erase(name);
|
||||
}
|
||||
|
||||
// Everyone agrees, let's delete
|
||||
delete_sstables(std::vector<sstring> tocs) {
|
||||
// FIXME: this needs to be done atomically (using a log file of sstables we intend to delete)
|
||||
parallel_for_each(merged_set->names, [] (sstring name) {
|
||||
deletion_logger.debug("deleting {}", name);
|
||||
return parallel_for_each(tocs, [] (sstring name) {
|
||||
return remove_by_toc_name(name);
|
||||
}).then_wrapped([merged_set] (future<> result) {
|
||||
deletion_logger.debug("atomic deletion completed: {}", merged_set->names);
|
||||
shared_future<> sf(std::move(result));
|
||||
for (auto&& comp : merged_set->completions) {
|
||||
sf.get_future().forward_to(std::move(*comp));
|
||||
}
|
||||
});
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
static thread_local atomic_deletion_manager g_atomic_deletion_manager(smp::count, delete_sstables);
|
||||
|
||||
future<>
|
||||
delete_atomically(std::vector<sstable_to_delete> ssts) {
|
||||
auto shard = engine().cpu_id();
|
||||
return smp::submit_to(0, [=] {
|
||||
return do_delete_atomically(ssts, shard);
|
||||
return g_atomic_deletion_manager.delete_atomically(ssts, shard);
|
||||
});
|
||||
}
|
||||
|
||||
@@ -2505,16 +2422,8 @@ delete_atomically(std::vector<shared_sstable> ssts) {
|
||||
return delete_atomically(std::move(sstables_to_delete_atomically));
|
||||
}
|
||||
|
||||
void
|
||||
cancel_atomic_deletions() {
|
||||
g_atomic_deletions_cancelled = true;
|
||||
for (auto&& pd : g_atomic_deletion_sets) {
|
||||
for (auto&& c : pd->completions) {
|
||||
c->set_exception(atomic_deletion_cancelled(pd->names));
|
||||
}
|
||||
}
|
||||
g_atomic_deletion_sets.clear();
|
||||
g_shards_agreeing_to_delete_sstable.clear();
|
||||
void cancel_atomic_deletions() {
|
||||
g_atomic_deletion_manager.cancel_atomic_deletions();
|
||||
}
|
||||
|
||||
atomic_deletion_cancelled::atomic_deletion_cancelled(std::vector<sstring> names)
|
||||
|
||||
@@ -49,6 +49,7 @@
|
||||
#include "query-request.hh"
|
||||
#include "key_reader.hh"
|
||||
#include "compound_compat.hh"
|
||||
#include "atomic_deletion.hh"
|
||||
|
||||
namespace sstables {
|
||||
|
||||
@@ -697,14 +698,6 @@ future<> await_background_jobs();
|
||||
// Invokes await_background_jobs() on all shards
|
||||
future<> await_background_jobs_on_all_shards();
|
||||
|
||||
struct sstable_to_delete {
|
||||
sstable_to_delete(sstring name, bool shared) : name(std::move(name)), shared(shared) {}
|
||||
sstring name;
|
||||
bool shared = false;
|
||||
friend std::ostream& operator<<(std::ostream& os, const sstable_to_delete& std);
|
||||
};
|
||||
|
||||
|
||||
// When we compact sstables, we have to atomically instantiate the new
|
||||
// sstable and delete the old ones. Otherwise, if we compact A+B into C,
|
||||
// and if A contained some data that was tombstoned by B, and if B was
|
||||
@@ -723,17 +716,6 @@ struct sstable_to_delete {
|
||||
future<> delete_atomically(std::vector<shared_sstable> ssts);
|
||||
future<> delete_atomically(std::vector<sstable_to_delete> ssts);
|
||||
|
||||
class atomic_deletion_cancelled : public std::exception {
|
||||
std::string _msg;
|
||||
public:
|
||||
explicit atomic_deletion_cancelled(std::vector<sstring> names);
|
||||
template <typename StringRange>
|
||||
explicit atomic_deletion_cancelled(StringRange range)
|
||||
: atomic_deletion_cancelled(std::vector<sstring>{range.begin(), range.end()}) {
|
||||
}
|
||||
const char* what() const noexcept override;
|
||||
};
|
||||
|
||||
// Cancel any deletions scheduled by delete_atomically() and make their
|
||||
// futures complete (with an atomic_deletion_cancelled exception).
|
||||
void cancel_atomic_deletions();
|
||||
|
||||
36
test.py
36
test.py
@@ -39,6 +39,7 @@ boost_tests = [
|
||||
'storage_proxy_test',
|
||||
'schema_change_test',
|
||||
'sstable_mutation_test',
|
||||
'sstable_atomic_deletion_test',
|
||||
'commitlog_test',
|
||||
'hash_test',
|
||||
'test-serialization',
|
||||
@@ -99,6 +100,10 @@ class Alarm(Exception):
|
||||
def alarm_handler(signum, frame):
|
||||
raise Alarm
|
||||
|
||||
def boost_test_wants_double_dash(path):
|
||||
magic = b'All the arguments after the -- are ignored'
|
||||
return magic in subprocess.check_output([path, '--help'], stderr=subprocess.STDOUT)
|
||||
|
||||
if __name__ == "__main__":
|
||||
all_modes = ['debug', 'release']
|
||||
|
||||
@@ -130,14 +135,19 @@ if __name__ == "__main__":
|
||||
test_to_run.append((os.path.join(prefix, test), 'boost'))
|
||||
|
||||
if 'release' in modes_to_run:
|
||||
test_to_run.append(('build/release/tests/lsa_async_eviction_test -c1 -m200M --size 1024 --batch 3000 --count 2000000','other'))
|
||||
test_to_run.append(('build/release/tests/lsa_sync_eviction_test -c1 -m100M --count 10 --standard-object-size 3000000','other'))
|
||||
test_to_run.append(('build/release/tests/lsa_sync_eviction_test -c1 -m100M --count 24000 --standard-object-size 2048','other'))
|
||||
test_to_run.append(('build/release/tests/lsa_sync_eviction_test -c1 -m1G --count 4000000 --standard-object-size 128','other'))
|
||||
test_to_run.append(('build/release/tests/row_cache_alloc_stress -c1 -m1G','other'))
|
||||
test_to_run.append(('build/release/tests/sstable_test -c1','boost'))
|
||||
test_to_run.append(('build/release/tests/lsa_async_eviction_test', 'other',
|
||||
'-c1 -m200M --size 1024 --batch 3000 --count 2000000'.split()))
|
||||
test_to_run.append(('build/release/tests/lsa_sync_eviction_test', 'other',
|
||||
'-c1 -m100M --count 10 --standard-object-size 3000000'.split()))
|
||||
test_to_run.append(('build/release/tests/lsa_sync_eviction_test', 'other',
|
||||
'-c1 -m100M --count 24000 --standard-object-size 2048'.split()))
|
||||
test_to_run.append(('build/release/tests/lsa_sync_eviction_test', 'other',
|
||||
'-c1 -m1G --count 4000000 --standard-object-size 128'.split()))
|
||||
test_to_run.append(('build/release/tests/row_cache_alloc_stress', 'other',
|
||||
'-c1 -m1G'.split()))
|
||||
test_to_run.append(('build/release/tests/sstable_test', 'boost', ['-c1']))
|
||||
if 'debug' in modes_to_run:
|
||||
test_to_run.append(('build/debug/tests/sstable_test -c1','boost'))
|
||||
test_to_run.append(('build/debug/tests/sstable_test', 'boost', ['-c1']))
|
||||
|
||||
if args.name:
|
||||
test_to_run = [t for t in test_to_run if args.name in t[0]]
|
||||
@@ -150,8 +160,10 @@ if __name__ == "__main__":
|
||||
env['ASAN_OPTIONS'] = 'alloc_dealloc_mismatch=0'
|
||||
for n, test in enumerate(test_to_run):
|
||||
path = test[0]
|
||||
exec_args = test[2] if len(test) >= 3 else []
|
||||
boost_args = []
|
||||
prefix = '[%d/%d]' % (n + 1, n_total)
|
||||
path += ' --collectd 0'
|
||||
exec_args += '--collectd 0'.split()
|
||||
signal.signal(signal.SIGALRM, alarm_handler)
|
||||
if args.jenkins and test[1] == 'boost':
|
||||
mode = 'release'
|
||||
@@ -159,9 +171,11 @@ if __name__ == "__main__":
|
||||
mode = 'debug'
|
||||
xmlout = (args.jenkins + "." + mode + "." +
|
||||
os.path.basename(test[0].split()[0]) + ".boost.xml")
|
||||
path = path + " --output_format=XML --log_level=test_suite --report_level=no --log_sink=" + xmlout
|
||||
print_status('%s RUNNING %s' % (prefix, path))
|
||||
proc = subprocess.Popen(shlex.split(path), stdout=subprocess.PIPE,
|
||||
boost_args += ['--output_format=XML', '--log_level=test_suite', '--report_level=no', '--log_sink=' + xmlout]
|
||||
print_status('%s RUNNING %s %s' % (prefix, path, ' '.join(boost_args + exec_args)))
|
||||
if test[1] == 'boost' and boost_test_wants_double_dash(path):
|
||||
boost_args += ['--']
|
||||
proc = subprocess.Popen([path] + boost_args + exec_args, stdout=subprocess.PIPE,
|
||||
stderr=subprocess.STDOUT,
|
||||
env=env, preexec_fn=os.setsid)
|
||||
out = None
|
||||
|
||||
@@ -277,6 +277,21 @@ SEASTAR_TEST_CASE(test_commitlog_delete_when_over_disk_limit) {
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_commitlog_reader){
|
||||
static auto count_mutations_in_segment = [] (sstring path) -> future<size_t> {
|
||||
auto count = make_lw_shared<size_t>(0);
|
||||
return db::commitlog::read_log_file(path, [count](temporary_buffer<char> buf, db::replay_position rp) {
|
||||
sstring str(buf.get(), buf.size());
|
||||
BOOST_CHECK_EQUAL(str, "hej bubba cow");
|
||||
(*count)++;
|
||||
return make_ready_future<>();
|
||||
}).then([](auto s) {
|
||||
return do_with(std::move(s), [](auto& s) {
|
||||
return s->done();
|
||||
});
|
||||
}).then([count] {
|
||||
return *count;
|
||||
});
|
||||
};
|
||||
commitlog::config cfg;
|
||||
cfg.commitlog_segment_size_in_mb = 1;
|
||||
return cl_test(cfg, [](commitlog& log) {
|
||||
@@ -309,18 +324,19 @@ SEASTAR_TEST_CASE(test_commitlog_reader){
|
||||
if (i == segments.end()) {
|
||||
throw std::runtime_error("Did not find expected log file");
|
||||
}
|
||||
return db::commitlog::read_log_file(*i, [count2](temporary_buffer<char> buf, db::replay_position rp) {
|
||||
sstring str(buf.get(), buf.size());
|
||||
BOOST_CHECK_EQUAL(str, "hej bubba cow");
|
||||
(*count2)++;
|
||||
return make_ready_future<>();
|
||||
}).then([](auto s) {
|
||||
return do_with(std::move(s), [](auto& s) {
|
||||
return s->done();
|
||||
});
|
||||
return *i;
|
||||
}).then([&log, count] (sstring segment_path) {
|
||||
// Check reading from an unsynced segment
|
||||
return count_mutations_in_segment(segment_path).then([count] (size_t replay_count) {
|
||||
BOOST_CHECK_GE(*count, replay_count);
|
||||
}).then([&log, count, segment_path] {
|
||||
return log.sync_all_segments().then([count, segment_path] {
|
||||
// Check reading from a synced segment
|
||||
return count_mutations_in_segment(segment_path).then([count] (size_t replay_count) {
|
||||
BOOST_CHECK_EQUAL(*count, replay_count);
|
||||
});
|
||||
}).then([count, count2] {
|
||||
BOOST_CHECK_EQUAL(*count, *count2);
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
170
tests/sstable_atomic_deletion_test.cc
Normal file
170
tests/sstable_atomic_deletion_test.cc
Normal file
@@ -0,0 +1,170 @@
|
||||
/*
|
||||
* Copyright (C) 2015 ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* This file is part of Scylla.
|
||||
*
|
||||
* Scylla is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Affero General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* Scylla is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include "sstables/atomic_deletion.hh"
|
||||
#include <seastar/tests/test-utils.hh>
|
||||
#include <deque>
|
||||
#include <boost/range/numeric.hpp>
|
||||
#include <boost/range/adaptor/transformed.hpp>
|
||||
|
||||
using namespace sstables;
|
||||
|
||||
|
||||
class atomic_deletion_test_env {
|
||||
public:
|
||||
using event = std::function<future<> (atomic_deletion_test_env& adm)>;
|
||||
private:
|
||||
struct a_hash {
|
||||
size_t operator()(const std::unordered_set<sstring>& s) const {
|
||||
auto h = std::hash<sstring>();
|
||||
return boost::accumulate(s | boost::adaptors::transformed(h), size_t(0)); // sue me
|
||||
}
|
||||
};
|
||||
atomic_deletion_manager _adm;
|
||||
std::deque<event> _events;
|
||||
std::unordered_set<std::unordered_set<sstring>, a_hash> _deletes;
|
||||
semaphore _deletion_counter { 0 };
|
||||
private:
|
||||
future<> delete_sstables(std::vector<sstring> names) {
|
||||
auto&& s1 = boost::copy_range<std::unordered_set<sstring>>(names);
|
||||
_deletes.insert(s1);
|
||||
_deletion_counter.signal();
|
||||
return make_ready_future<>();
|
||||
}
|
||||
public:
|
||||
explicit atomic_deletion_test_env(unsigned shard_count, std::vector<event> events)
|
||||
: _adm(shard_count, [this] (std::vector<sstring> names) {
|
||||
return delete_sstables(names);
|
||||
})
|
||||
, _events(events.begin(), events.end()) {
|
||||
}
|
||||
void expect_no_deletion() {
|
||||
BOOST_REQUIRE(_deletes.empty());
|
||||
}
|
||||
future<> schedule_delete(std::vector<sstable_to_delete> names, unsigned shard) {
|
||||
_adm.delete_atomically(names, shard).discard_result();
|
||||
return make_ready_future<>();
|
||||
}
|
||||
future<> expect_deletion(std::vector<sstring> names) {
|
||||
return _deletion_counter.wait().then([this, names] {
|
||||
auto&& s1 = boost::copy_range<std::unordered_set<sstring>>(names);
|
||||
auto erased = _deletes.erase(s1);
|
||||
BOOST_REQUIRE_EQUAL(erased, 1);
|
||||
});
|
||||
}
|
||||
future<> test() {
|
||||
// run all _events sequentially
|
||||
return repeat([this] {
|
||||
if (_events.empty()) {
|
||||
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
||||
}
|
||||
auto ev = std::move(_events.front());
|
||||
_events.pop_front();
|
||||
return ev(*this).then([] {
|
||||
return stop_iteration::no;
|
||||
});
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
future<> test_atomic_deletion_manager(unsigned shards, std::vector<atomic_deletion_test_env::event> events) {
|
||||
auto env = make_lw_shared<atomic_deletion_test_env>(shards, events);
|
||||
return env->test().finally([env] {});
|
||||
}
|
||||
|
||||
atomic_deletion_test_env::event
|
||||
delete_many(std::vector<sstable_to_delete> v, unsigned shard) {
|
||||
return [v, shard] (atomic_deletion_test_env& env) {
|
||||
// verify we didn't have an early delete from previous deletion
|
||||
env.expect_no_deletion();
|
||||
return env.schedule_delete(v, shard);
|
||||
};
|
||||
}
|
||||
|
||||
atomic_deletion_test_env::event
|
||||
delete_one(sstable_to_delete s, unsigned shard) {
|
||||
return delete_many({s}, shard);
|
||||
}
|
||||
|
||||
atomic_deletion_test_env::event
|
||||
expect_many(std::vector<sstring> names) {
|
||||
return [names] (atomic_deletion_test_env& env) {
|
||||
return env.expect_deletion(names);
|
||||
};
|
||||
}
|
||||
|
||||
atomic_deletion_test_env::event
|
||||
expect_one(sstring name) {
|
||||
return expect_many({name});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_single_shard_single_sstable) {
|
||||
return test_atomic_deletion_manager(1, {
|
||||
delete_one({"1", false}, 0),
|
||||
expect_one("1"),
|
||||
delete_one({"2", true}, 0),
|
||||
expect_one("2"),
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_multi_shard_single_sstable) {
|
||||
return test_atomic_deletion_manager(3, {
|
||||
delete_one({"1", true}, 0),
|
||||
delete_one({"1", true}, 1),
|
||||
delete_one({"1", true}, 2),
|
||||
expect_one("1"),
|
||||
delete_one({"2", false}, 1),
|
||||
expect_one("2"),
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_nonshared_compaction) {
|
||||
return test_atomic_deletion_manager(5, {
|
||||
delete_many({{"1", false}, {"2", false}, {"3", false}}, 2),
|
||||
expect_many({"1", "2", "3"}),
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_shared_compaction) {
|
||||
return test_atomic_deletion_manager(3, {
|
||||
delete_one({"1", true}, 0),
|
||||
delete_many({{"1", true}, {"2", false}, {"3", false}}, 2),
|
||||
delete_one({"1", true}, 1),
|
||||
expect_many({"1", "2", "3"}),
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_overlapping_compaction) {
|
||||
return test_atomic_deletion_manager(3, {
|
||||
delete_one({"1", true}, 0),
|
||||
delete_one({"3", true}, 0),
|
||||
delete_many({{"1", true}, {"2", false}, {"3", true}}, 2),
|
||||
delete_one({"1", true}, 1),
|
||||
delete_many({{"3", true}, {"4", false}}, 1),
|
||||
expect_many({"1", "2", "3", "4"}),
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
#include "disk-error-handler.hh"
|
||||
|
||||
thread_local disk_error_signal_type commit_error;
|
||||
thread_local disk_error_signal_type general_disk_error;
|
||||
@@ -1252,7 +1252,9 @@ static future<std::vector<unsigned long>> compact_sstables(std::vector<unsigned
|
||||
}
|
||||
auto candidates = get_candidates_for_leveled_strategy(*cf);
|
||||
leveled_manifest manifest = leveled_manifest::create(*cf, candidates, 1);
|
||||
auto candidate = manifest.get_compaction_candidates();
|
||||
std::vector<stdx::optional<dht::decorated_key>> last_compacted_keys(leveled_manifest::MAX_LEVELS);
|
||||
std::vector<int> compaction_counter(leveled_manifest::MAX_LEVELS);
|
||||
auto candidate = manifest.get_compaction_candidates(last_compacted_keys, compaction_counter);
|
||||
BOOST_REQUIRE(candidate.sstables.size() == sstables->size());
|
||||
BOOST_REQUIRE(candidate.level == 1);
|
||||
BOOST_REQUIRE(candidate.max_sstable_bytes == 1024*1024);
|
||||
@@ -1731,7 +1733,9 @@ SEASTAR_TEST_CASE(leveled_01) {
|
||||
auto candidates = get_candidates_for_leveled_strategy(*cf);
|
||||
leveled_manifest manifest = leveled_manifest::create(*cf, candidates, max_sstable_size_in_mb);
|
||||
BOOST_REQUIRE(manifest.get_level_size(0) == 2);
|
||||
auto candidate = manifest.get_compaction_candidates();
|
||||
std::vector<stdx::optional<dht::decorated_key>> last_compacted_keys(leveled_manifest::MAX_LEVELS);
|
||||
std::vector<int> compaction_counter(leveled_manifest::MAX_LEVELS);
|
||||
auto candidate = manifest.get_compaction_candidates(last_compacted_keys, compaction_counter);
|
||||
BOOST_REQUIRE(candidate.sstables.size() == 2);
|
||||
BOOST_REQUIRE(candidate.level == 0);
|
||||
|
||||
@@ -1786,7 +1790,9 @@ SEASTAR_TEST_CASE(leveled_02) {
|
||||
auto candidates = get_candidates_for_leveled_strategy(*cf);
|
||||
leveled_manifest manifest = leveled_manifest::create(*cf, candidates, max_sstable_size_in_mb);
|
||||
BOOST_REQUIRE(manifest.get_level_size(0) == 3);
|
||||
auto candidate = manifest.get_compaction_candidates();
|
||||
std::vector<stdx::optional<dht::decorated_key>> last_compacted_keys(leveled_manifest::MAX_LEVELS);
|
||||
std::vector<int> compaction_counter(leveled_manifest::MAX_LEVELS);
|
||||
auto candidate = manifest.get_compaction_candidates(last_compacted_keys, compaction_counter);
|
||||
BOOST_REQUIRE(candidate.sstables.size() == 3);
|
||||
BOOST_REQUIRE(candidate.level == 0);
|
||||
|
||||
@@ -1844,7 +1850,9 @@ SEASTAR_TEST_CASE(leveled_03) {
|
||||
leveled_manifest manifest = leveled_manifest::create(*cf, candidates, max_sstable_size_in_mb);
|
||||
BOOST_REQUIRE(manifest.get_level_size(0) == 2);
|
||||
BOOST_REQUIRE(manifest.get_level_size(1) == 2);
|
||||
auto candidate = manifest.get_compaction_candidates();
|
||||
std::vector<stdx::optional<dht::decorated_key>> last_compacted_keys(leveled_manifest::MAX_LEVELS);
|
||||
std::vector<int> compaction_counter(leveled_manifest::MAX_LEVELS);
|
||||
auto candidate = manifest.get_compaction_candidates(last_compacted_keys, compaction_counter);
|
||||
BOOST_REQUIRE(candidate.sstables.size() == 3);
|
||||
BOOST_REQUIRE(candidate.level == 1);
|
||||
|
||||
@@ -1914,7 +1922,9 @@ SEASTAR_TEST_CASE(leveled_04) {
|
||||
auto level2_score = (double) manifest.get_total_bytes(manifest.get_level(2)) / (double) manifest.max_bytes_for_level(2);
|
||||
BOOST_REQUIRE(level2_score < 1.001);
|
||||
|
||||
auto candidate = manifest.get_compaction_candidates();
|
||||
std::vector<stdx::optional<dht::decorated_key>> last_compacted_keys(leveled_manifest::MAX_LEVELS);
|
||||
std::vector<int> compaction_counter(leveled_manifest::MAX_LEVELS);
|
||||
auto candidate = manifest.get_compaction_candidates(last_compacted_keys, compaction_counter);
|
||||
BOOST_REQUIRE(candidate.sstables.size() == 2);
|
||||
BOOST_REQUIRE(candidate.level == 2);
|
||||
|
||||
@@ -1976,7 +1986,9 @@ SEASTAR_TEST_CASE(leveled_06) {
|
||||
BOOST_REQUIRE(manifest.get_level_size(1) == 1);
|
||||
BOOST_REQUIRE(manifest.get_level_size(2) == 0);
|
||||
|
||||
auto candidate = manifest.get_compaction_candidates();
|
||||
std::vector<stdx::optional<dht::decorated_key>> last_compacted_keys(leveled_manifest::MAX_LEVELS);
|
||||
std::vector<int> compaction_counter(leveled_manifest::MAX_LEVELS);
|
||||
auto candidate = manifest.get_compaction_candidates(last_compacted_keys, compaction_counter);
|
||||
BOOST_REQUIRE(candidate.level == 2);
|
||||
BOOST_REQUIRE(candidate.sstables.size() == 1);
|
||||
auto& sst = (candidate.sstables)[0];
|
||||
|
||||
@@ -1474,12 +1474,13 @@ private:
|
||||
class column_visitor : public Aggregator {
|
||||
const schema& _s;
|
||||
const query::partition_slice& _slice;
|
||||
uint32_t _cell_limit;
|
||||
const uint32_t _cell_limit;
|
||||
uint32_t _current_cell_limit;
|
||||
std::vector<std::pair<std::string, typename Aggregator::type>> _aggregation;
|
||||
typename Aggregator::type* _current_aggregation;
|
||||
public:
|
||||
column_visitor(const schema& s, const query::partition_slice& slice, uint32_t cell_limit)
|
||||
: _s(s), _slice(slice), _cell_limit(cell_limit)
|
||||
: _s(s), _slice(slice), _cell_limit(cell_limit), _current_cell_limit(0)
|
||||
{ }
|
||||
std::vector<std::pair<std::string, typename Aggregator::type>>&& release() {
|
||||
return std::move(_aggregation);
|
||||
@@ -1492,6 +1493,7 @@ private:
|
||||
void accept_new_partition(const partition_key& key, uint32_t row_count) {
|
||||
_aggregation.emplace_back(partition_key_to_string(_s, key), typename Aggregator::type());
|
||||
_current_aggregation = &_aggregation.back().second;
|
||||
_current_cell_limit = _cell_limit;
|
||||
}
|
||||
void accept_new_partition(uint32_t row_count) {
|
||||
// We always ask for the partition_key to be sent in query_opts().
|
||||
@@ -1500,19 +1502,19 @@ private:
|
||||
void accept_new_row(const clustering_key_prefix& key, const query::result_row_view& static_row, const query::result_row_view& row) {
|
||||
auto it = row.iterator();
|
||||
auto cell = it.next_atomic_cell();
|
||||
if (cell && _cell_limit > 0) {
|
||||
if (cell && _current_cell_limit > 0) {
|
||||
bytes column_name = composite::serialize_value(key.components(), _s.thrift().has_compound_comparator());
|
||||
Aggregator::on_column(_current_aggregation, column_name, *cell);
|
||||
_cell_limit -= 1;
|
||||
_current_cell_limit -= 1;
|
||||
}
|
||||
}
|
||||
void accept_new_row(const query::result_row_view& static_row, const query::result_row_view& row) {
|
||||
auto it = row.iterator();
|
||||
for (auto&& id : _slice.regular_columns) {
|
||||
auto cell = it.next_atomic_cell();
|
||||
if (cell && _cell_limit > 0) {
|
||||
if (cell && _current_cell_limit > 0) {
|
||||
Aggregator::on_column(_current_aggregation, _s.regular_column_at(id).name(), *cell);
|
||||
_cell_limit -= 1;
|
||||
_current_cell_limit -= 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -600,4 +600,62 @@ inline void stop_foreground(const trace_state_ptr& state) {
|
||||
state->stop_foreground_and_write();
|
||||
}
|
||||
}
|
||||
|
||||
// global_trace_state_ptr is a helper class that may be used for creating spans
|
||||
// of an existing tracing session on other shards. When a tracing span on a
|
||||
// different shard is needed global_trace_state_ptr would create a secondary
|
||||
// tracing session on that shard similarly to what we do when we create tracing
|
||||
// spans on remote Nodes.
|
||||
//
|
||||
// The usage is straight forward:
|
||||
// 1. Create a global_trace_state_ptr from the existing trace_state_ptr object.
|
||||
// 2. Pass it to the execution unit that (possibly) runs on a different shard
|
||||
// and pass the global_trace_state_ptr object instead of a trace_state_ptr
|
||||
// object.
|
||||
class global_trace_state_ptr {
|
||||
unsigned _cpu_of_origin;
|
||||
trace_state_ptr _ptr;
|
||||
public:
|
||||
// Note: the trace_state_ptr must come from the current shard
|
||||
global_trace_state_ptr(trace_state_ptr t)
|
||||
: _cpu_of_origin(engine().cpu_id())
|
||||
, _ptr(std::move(t))
|
||||
{ }
|
||||
|
||||
// May be invoked across shards.
|
||||
global_trace_state_ptr(const global_trace_state_ptr& other)
|
||||
: global_trace_state_ptr(other.get())
|
||||
{ }
|
||||
|
||||
// May be invoked across shards.
|
||||
global_trace_state_ptr(global_trace_state_ptr&& other)
|
||||
: global_trace_state_ptr(other.get())
|
||||
{ }
|
||||
|
||||
global_trace_state_ptr& operator=(const global_trace_state_ptr&) = delete;
|
||||
|
||||
// May be invoked across shards.
|
||||
trace_state_ptr get() const {
|
||||
// optimize the "tracing not enabled" case
|
||||
if (!_ptr) {
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
if (_cpu_of_origin != engine().cpu_id()) {
|
||||
auto opt_trace_info = make_trace_info(_ptr);
|
||||
if (opt_trace_info) {
|
||||
trace_state_ptr new_trace_state = tracing::get_local_tracing_instance().create_session(*opt_trace_info);
|
||||
begin(new_trace_state);
|
||||
return new_trace_state;
|
||||
} else {
|
||||
return nullptr;
|
||||
}
|
||||
}
|
||||
|
||||
return _ptr;
|
||||
}
|
||||
|
||||
// May be invoked across shards.
|
||||
operator trace_state_ptr() const { return get(); }
|
||||
};
|
||||
}
|
||||
|
||||
@@ -652,13 +652,13 @@ future<> cql_server::connection::process_request() {
|
||||
f.length, mem_estimate, _server._max_request_size));
|
||||
}
|
||||
|
||||
return with_semaphore(_server._memory_available, mem_estimate, [this, length = f.length, flags = f.flags, op, stream, tracing_requested] {
|
||||
return read_and_decompress_frame(length, flags).then([this, flags, op, stream, tracing_requested] (temporary_buffer<char> buf) {
|
||||
return get_units(_server._memory_available, mem_estimate).then([this, length = f.length, flags = f.flags, op, stream, tracing_requested] (semaphore_units<> mem_permit) {
|
||||
return this->read_and_decompress_frame(length, flags).then([this, flags, op, stream, tracing_requested, mem_permit = std::move(mem_permit)] (temporary_buffer<char> buf) mutable {
|
||||
|
||||
++_server._requests_served;
|
||||
++_server._requests_serving;
|
||||
|
||||
with_gate(_pending_requests_gate, [this, flags, op, stream, buf = std::move(buf), tracing_requested] () mutable {
|
||||
with_gate(_pending_requests_gate, [this, flags, op, stream, buf = std::move(buf), tracing_requested, mem_permit = std::move(mem_permit)] () mutable {
|
||||
auto bv = bytes_view{reinterpret_cast<const int8_t*>(buf.begin()), buf.size()};
|
||||
auto cpu = pick_request_cpu();
|
||||
return smp::submit_to(cpu, [this, bv = std::move(bv), op, stream, client_state = _client_state, tracing_requested] () mutable {
|
||||
@@ -672,7 +672,7 @@ future<> cql_server::connection::process_request() {
|
||||
}).then([this, flags] (auto&& response) {
|
||||
_client_state.merge(response.second);
|
||||
return this->write_response(std::move(response.first), _compression);
|
||||
}).then([buf = std::move(buf)] {
|
||||
}).then([buf = std::move(buf), mem_permit = std::move(mem_permit)] {
|
||||
// Keep buf alive.
|
||||
});
|
||||
}).handle_exception([] (std::exception_ptr ex) {
|
||||
@@ -1504,7 +1504,11 @@ std::vector<char> cql_server::response::compress_lz4(const std::vector<char>& bo
|
||||
output[1] = (input_len >> 16) & 0xFF;
|
||||
output[2] = (input_len >> 8) & 0xFF;
|
||||
output[3] = input_len & 0xFF;
|
||||
#ifdef HAVE_LZ4_COMPRESS_DEFAULT
|
||||
auto ret = LZ4_compress_default(input, output + 4, input_len, LZ4_compressBound(input_len));
|
||||
#else
|
||||
auto ret = LZ4_compress(input, output + 4, input_len);
|
||||
#endif
|
||||
if (ret == 0) {
|
||||
throw std::runtime_error("CQL frame LZ4 compression failure");
|
||||
}
|
||||
|
||||
@@ -39,8 +39,8 @@ class moving_average {
|
||||
public:
|
||||
moving_average(latency_counter::duration interval, latency_counter::duration tick_interval) :
|
||||
_tick_interval(tick_interval) {
|
||||
_alpha = 1 - std::exp(-std::chrono::duration_cast<std::chrono::nanoseconds>(interval).count()/
|
||||
static_cast<double>(std::chrono::duration_cast<std::chrono::nanoseconds>(tick_interval).count()));
|
||||
_alpha = 1 - std::exp(-std::chrono::duration_cast<std::chrono::seconds>(tick_interval).count()/
|
||||
static_cast<double>(std::chrono::duration_cast<std::chrono::seconds>(interval).count()));
|
||||
}
|
||||
|
||||
void add(uint64_t val = 1) {
|
||||
@@ -48,7 +48,7 @@ public:
|
||||
}
|
||||
|
||||
void update() {
|
||||
double instant_rate = _count / static_cast<double>(std::chrono::duration_cast<std::chrono::nanoseconds>(_tick_interval).count());
|
||||
double instant_rate = _count / static_cast<double>(std::chrono::duration_cast<std::chrono::seconds>(_tick_interval).count());
|
||||
if (_initialized) {
|
||||
_rate += (_alpha * (instant_rate - _rate));
|
||||
} else {
|
||||
@@ -70,7 +70,8 @@ public:
|
||||
}
|
||||
};
|
||||
|
||||
class ihistogram {
|
||||
template <typename Unit>
|
||||
class basic_ihistogram {
|
||||
public:
|
||||
// count holds all the events
|
||||
int64_t count;
|
||||
@@ -84,12 +85,13 @@ public:
|
||||
double variance;
|
||||
int64_t sample_mask;
|
||||
boost::circular_buffer<int64_t> sample;
|
||||
ihistogram(size_t size = 1024, int64_t _sample_mask = 0x80)
|
||||
basic_ihistogram(size_t size = 1024, int64_t _sample_mask = 0x80)
|
||||
: count(0), total(0), min(0), max(0), sum(0), started(0), mean(0), variance(0),
|
||||
sample_mask(_sample_mask), sample(
|
||||
size) {
|
||||
}
|
||||
void mark(int64_t value) {
|
||||
void mark(int64_t ns_value) {
|
||||
auto value = std::chrono::duration_cast<Unit>(std::chrono::nanoseconds(ns_value)).count();
|
||||
if (total == 0 || value < min) {
|
||||
min = value;
|
||||
}
|
||||
@@ -131,7 +133,7 @@ public:
|
||||
/**
|
||||
* Set the latency according to the sample rate.
|
||||
*/
|
||||
ihistogram& set_latency(latency_counter& lc) {
|
||||
basic_ihistogram& set_latency(latency_counter& lc) {
|
||||
if (should_sample()) {
|
||||
lc.start();
|
||||
}
|
||||
@@ -144,7 +146,7 @@ public:
|
||||
* Increment the total number of events without
|
||||
* sampling the value.
|
||||
*/
|
||||
ihistogram& inc() {
|
||||
basic_ihistogram& inc() {
|
||||
count++;
|
||||
return *this;
|
||||
}
|
||||
@@ -157,7 +159,7 @@ public:
|
||||
return a * a;
|
||||
}
|
||||
|
||||
ihistogram& operator +=(const ihistogram& o) {
|
||||
basic_ihistogram& operator +=(const basic_ihistogram& o) {
|
||||
if (count == 0) {
|
||||
*this = o;
|
||||
} else if (o.count > 0) {
|
||||
@@ -190,14 +192,18 @@ public:
|
||||
return mean * count;
|
||||
}
|
||||
|
||||
friend ihistogram operator +(ihistogram a, const ihistogram& b);
|
||||
template <typename U>
|
||||
friend basic_ihistogram<U> operator +(basic_ihistogram<U> a, const basic_ihistogram<U>& b);
|
||||
};
|
||||
|
||||
inline ihistogram operator +(ihistogram a, const ihistogram& b) {
|
||||
template <typename Unit>
|
||||
inline basic_ihistogram<Unit> operator +(basic_ihistogram<Unit> a, const basic_ihistogram<Unit>& b) {
|
||||
a += b;
|
||||
return a;
|
||||
}
|
||||
|
||||
using ihistogram = basic_ihistogram<std::chrono::microseconds>;
|
||||
|
||||
struct rate_moving_average {
|
||||
uint64_t count = 0;
|
||||
double rates[3] = {0};
|
||||
@@ -222,7 +228,7 @@ class timed_rate_moving_average {
|
||||
static constexpr latency_counter::duration tick_interval() {
|
||||
return std::chrono::seconds(10);
|
||||
}
|
||||
moving_average rates[3] = {{tick_interval(), std::chrono::minutes(1)}, {tick_interval(), std::chrono::minutes(5)}, {tick_interval(), std::chrono::minutes(15)}};
|
||||
moving_average rates[3] = {{std::chrono::minutes(1), tick_interval()}, {std::chrono::minutes(5), tick_interval()}, {std::chrono::minutes(15), tick_interval()}};
|
||||
latency_counter::time_point start_time;
|
||||
timer<> _timer;
|
||||
|
||||
@@ -246,7 +252,7 @@ public:
|
||||
rate_moving_average rate() const {
|
||||
rate_moving_average res;
|
||||
if (_count > 0) {
|
||||
double elapsed = std::chrono::duration_cast<std::chrono::nanoseconds>(latency_counter::now() - start_time).count();
|
||||
double elapsed = std::chrono::duration_cast<std::chrono::seconds>(latency_counter::now() - start_time).count();
|
||||
res.mean_rate = (_count / elapsed);
|
||||
}
|
||||
res.count = _count;
|
||||
|
||||
Reference in New Issue
Block a user