Compare commits

...

14 Commits

Author SHA1 Message Date
Pekka Enberg
bf71575fd7 release: prepare for 0.18.1 2016-03-05 08:53:07 +02:00
Gleb Natapov
cd75075214 storage_proxy: fix race between read cl completion and timeout in digest resolver
If timeout happens after cl promise is fulfilled, but before
continuation runs it removes all the data that cl continuation needs
to calculate result. Fix this by calculating result immediately and
returning it in cl promise instead of delaying this work until
continuation runs. This has a nice side effect of simplifying digest
mismatch handling and making it exception free.

Fixes #977.

Message-Id: <1457015870-2106-3-git-send-email-gleb@scylladb.com>
(cherry picked from commit b89b6f442b)
2016-03-03 17:10:38 +02:00
Gleb Natapov
e85f11566b storage_proxy: store only one data reply in digest resolver.
Read executor may ask for more than one data reply during digest
resolving stage, but only one result is actually needed to satisfy
a query, so no need to store all of them.

Message-Id: <1457015870-2106-2-git-send-email-gleb@scylladb.com>
(cherry picked from commit e4ac5157bc)
2016-03-03 17:10:32 +02:00
Gleb Natapov
8f682f018e storage_proxy: fix cl achieved condition in digest resolver timeout handler
In digest resolver for cl to be achieved it is not enough to get correct
number of replies, but also to have data reply among them. The condition
in digest timeout does not check that, fortunately we have a variable
that we set to true when cl is achieved, so use it instead.

Message-Id: <1457015870-2106-1-git-send-email-gleb@scylladb.com>
(cherry picked from commit 69b61b81ce)
2016-03-03 17:10:26 +02:00
Tomasz Grabiec
dba2b617e7 db: Fix error handling in populate_keyspace()
When find_uuid() fails Scylla would terminate with:

  Exiting on unhandled exception of type 'std::out_of_range': _Map_base::at

But we are supposed to ignore directories for unknown column
families. The try {} catch block is doing just that when
no_such_column_family is thrown from the find_column_family() call
which follows find_uuid(). Fix by converting std::out_of_range to
no_such_column_family.

Message-Id: <1456056280-3933-1-git-send-email-tgrabiec@scylladb.com>
2016-03-03 11:37:26 +02:00
Paweł Dziepak
f4e11007cf Revert "do not use boost::multiprecision::msb()"
This reverts commit dadd097f9c.

That commit caused serialized forms of varint and decimal to have some
excess leading zeros. They didn't affect deserialization in any way but
caused computed tokens to differ from the Cassandra ones.

Fixes #898.

Signed-off-by: Paweł Dziepak <pdziepak@scylladb.com>
Message-Id: <1455537278-20106-1-git-send-email-pdziepak@scylladb.com>
2016-03-03 10:54:19 +02:00
Asias He
fdfa1df395 locator: Fix get token from a range<token>
With a range{t1, t2}, if t2 == {}, the range.end() will contain no
value. Fix getting t2 in this case.

Fixes #911.
Message-Id: <4462e499d706d275c03b116c4645e8aaee7821e1.1456128310.git.asias@scylladb.com>
2016-03-03 10:53:21 +02:00
Tomasz Grabiec
116055cc6f bytes_ostream: Avoid recursion when freeing chunks
When there is a lot of chunks we may get stack overflow.

This seems to fix issue #906, a memory corruption during schema
merge. I suspect that what causes corruption there is overflowing of
the stack allocated for the seastar thread. Those stacks don't have
red zones which would catch overflow.

Message-Id: <1456056288-3983-1-git-send-email-tgrabiec@scylladb.com>
2016-03-03 10:53:01 +02:00
Calle Wilund
04c19344de database: Fix use and assumptions about pending compations
Fixes #934 - faulty assert in discard_sstables

run_with_compaction_disabled clears out a CF from compaction
mananger queue. discard_sstables wants to assert on this, but looks
at the wrong counters.

pending_compactions is an indicator on how much interested parties
want a CF compacted (again and again). It should not be considered
an indicator of compactions actually being done.

This modifies the usage slightly so that:
1.) The counter is always incremented, even if compaction is disallowed.
    The counters value on end of run_with_compaction_disabled is then
    instead used as an indicator as to whether a compaction should be
    re-triggered. (If compactions finished, it will be zero)
2.) Document the use and purpose of the pending counter, and add
    method to re-add CF to compaction for r_w_c_d above.
3.) discard_sstables now asserts on the right things.

Message-Id: <1456332824-23349-1-git-send-email-calle@scylladb.com>
2016-03-03 10:51:27 +02:00
Raphael S. Carvalho
df19e546f9 tests: sstable_test: submit compaction request through column family
That's needed for reverted commit 9586793c to work. It's also the
correct thing to do, i.e. column family submits itself to manager.

Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
Message-Id: <2a1d141ad929c1957933f57412083dd52af0390b.1456415398.git.raphaelsc@scylladb.com>
2016-03-03 10:51:23 +02:00
Takuya ASADA
b532919c55 dist: add posix_net_conf.sh on Ubuntu package
Fixes #881

Signed-off-by: Takuya ASADA <syuu@scylladb.com>
Message-Id: <1455522990-32044-1-git-send-email-syuu@scylladb.com>
(cherry picked from commit fb3f4cc148)
2016-02-15 17:03:10 +02:00
Takuya ASADA
6ae6dcc2fc dist: switch AMI base image to 'CentOS7-Base2', uses CentOS official kernel
On previous CentOS base image, it accsidently uses non-standard kernel from elrepo.
This replaces base image to new one, contains CentOS default kernel.

Fixes #890

Signed-off-by: Takuya ASADA <syuu@scylladb.com>
Message-Id: <1455398903-2865-1-git-send-email-syuu@scylladb.com>
(cherry picked from commit 3697cee76d)
2016-02-15 15:59:04 +02:00
Tomasz Grabiec
5716140a14 abstract_replication_strategy: Fix generation of token ranges
We can't move-from in the loop because the subject will be empty in
all but the first iteration.

Fixes crash during node stratup:

  "Exiting on unhandled exception of type 'runtime_exception': runtime error: Invalid token. Should have size 8, has size 0"

Fixes update_cluster_layout_tests.py:TestUpdateClusterLayout.simple_add_node_1_test (and probably others)

Signed-off-by: Tomasz Grabiec <tgrabiec@scylladb.com>
(cherry picked from commit efdbc3d6d7)
2016-02-14 14:39:31 +02:00
Avi Kivity
91cb9bae2e release: prepare for 0.18 2016-02-11 17:55:20 +02:00
12 changed files with 109 additions and 78 deletions

View File

@@ -1,6 +1,6 @@
#!/bin/sh
VERSION=666.development
VERSION=0.18.1
if test -f version
then

View File

@@ -42,6 +42,14 @@ private:
struct chunk {
// FIXME: group fragment pointers to reduce pointer chasing when packetizing
std::unique_ptr<chunk> next;
~chunk() {
auto p = std::move(next);
while (p) {
// Avoid recursion when freeing chunks
auto p_next = std::move(p->next);
p = std::move(p_next);
}
}
size_type offset; // Also means "size" after chunk is closed
size_type size;
value_type data[0];

View File

@@ -859,14 +859,25 @@ void column_family::start_compaction() {
void column_family::trigger_compaction() {
// Submitting compaction job to compaction manager.
// #934 - always inc the pending counter, to help
// indicate the want for compaction.
_stats.pending_compactions++;
do_trigger_compaction(); // see below
}
void column_family::do_trigger_compaction() {
// But only submit if we're not locked out
if (!_compaction_disabled) {
_stats.pending_compactions++;
_compaction_manager.submit(this);
}
}
future<> column_family::run_compaction(sstables::compaction_descriptor descriptor) {
assert(_stats.pending_compactions > 0);
return compact_sstables(std::move(descriptor)).then([this] {
// only do this on success. (no exceptions)
// in that case, we rely on it being still set
// for reqeueuing
_stats.pending_compactions--;
});
}
@@ -1082,7 +1093,13 @@ future<> database::populate_keyspace(sstring datadir, sstring ks_name) {
sstring uuidst = comps[1];
try {
auto&& uuid = find_uuid(ks_name, cfname);
auto&& uuid = [&] {
try {
return find_uuid(ks_name, cfname);
} catch (const std::out_of_range& e) {
std::throw_with_nested(no_such_column_family(ks_name, cfname));
}
}();
auto& cf = find_column_family(uuid);
// #870: Check that the directory name matches
@@ -2272,7 +2289,8 @@ void column_family::clear() {
// NOTE: does not need to be futurized, but might eventually, depending on
// if we implement notifications, whatnot.
future<db::replay_position> column_family::discard_sstables(db_clock::time_point truncated_at) {
assert(_stats.pending_compactions == 0);
assert(_compaction_disabled > 0);
assert(!compaction_manager_queued());
return with_lock(_sstables_lock.for_read(), [this, truncated_at] {
db::replay_position rp;

View File

@@ -206,6 +206,7 @@ private:
key_source sstables_as_key_source() const;
partition_presence_checker make_partition_presence_checker(lw_shared_ptr<sstable_list> old_sstables);
std::chrono::steady_clock::time_point _sstable_writes_disabled_at;
void do_trigger_compaction();
public:
// Creates a mutation reader which covers all data sources for this column family.
// Caller needs to ensure that column_family remains live (FIXME: relax this).
@@ -361,8 +362,12 @@ public:
Result run_with_compaction_disabled(Func && func) {
++_compaction_disabled;
return _compaction_manager.remove(this).then(std::forward<Func>(func)).finally([this] {
if (--_compaction_disabled == 0) {
trigger_compaction();
// #934. The pending counter is actually a great indicator into whether we
// actually need to trigger a compaction again.
if (--_compaction_disabled == 0 && _stats.pending_compactions > 0) {
// we're turning if on again, use function that does not increment
// the counter further.
do_trigger_compaction();
}
});
}

View File

@@ -8,7 +8,7 @@
"security_group_id": "{{user `security_group_id`}}",
"region": "{{user `region`}}",
"associate_public_ip_address": "{{user `associate_public_ip_address`}}",
"source_ami": "ami-8ef1d6e4",
"source_ami": "ami-f3102499",
"user_data_file": "user_data.txt",
"instance_type": "{{user `instance_type`}}",
"ssh_username": "centos",

View File

@@ -40,7 +40,8 @@ override_dh_auto_install:
cp -r $(CURDIR)/licenses $(DOC)
mkdir -p $(SCRIPTS) && \
cp $(CURDIR)/seastar/dpdk/tools/dpdk_nic_bind.py $(SCRIPTS)
cp $(CURDIR)/seastar/scripts/dpdk_nic_bind.py $(SCRIPTS)
cp $(CURDIR)/seastar/scripts/posix_net_conf.sh $(SCRIPTS)
cp $(CURDIR)/dist/common/scripts/* $(SCRIPTS)
cp $(CURDIR)/dist/ubuntu/scripts/* $(SCRIPTS)

View File

@@ -168,12 +168,12 @@ abstract_replication_strategy::get_address_ranges(token_metadata& tm) const {
if (wrap) {
auto split_ranges = r.unwrap();
for (auto ep : eps) {
ret.emplace(ep, std::move(split_ranges.first));
ret.emplace(ep, std::move(split_ranges.second));
ret.emplace(ep, split_ranges.first);
ret.emplace(ep, split_ranges.second);
}
} else {
for (auto ep : eps) {
ret.emplace(ep, std::move(r));
ret.emplace(ep, r);
}
}
}
@@ -190,12 +190,12 @@ abstract_replication_strategy::get_range_addresses(token_metadata& tm) const {
if (wrap) {
auto split_ranges = r.unwrap();
for (auto ep : eps) {
ret.emplace(std::move(split_ranges.first), ep);
ret.emplace(std::move(split_ranges.second), ep);
ret.emplace(split_ranges.first, ep);
ret.emplace(split_ranges.second, ep);
}
} else {
for (auto ep : eps) {
ret.emplace(std::move(r), ep);
ret.emplace(r, ep);
}
}
}

View File

@@ -398,7 +398,7 @@ void token_metadata::calculate_pending_ranges(abstract_replication_strategy& str
// all leaving nodes are gone.
auto metadata = clone_only_token_map(); // don't do this in the loop! #7758
for (const auto& r : affected_ranges) {
auto t = r.end()->value();
auto t = r.end() ? r.end()->value() : dht::maximum_token();
auto current_endpoints = strategy.calculate_natural_endpoints(t, metadata);
auto new_endpoints = strategy.calculate_natural_endpoints(t, all_left_metadata);
std::vector<inet_address> diff;

View File

@@ -1390,11 +1390,6 @@ future<> storage_proxy::schedule_repair(std::unordered_map<gms::inet_address, st
}).finally([p = shared_from_this()] {});
}
class digest_mismatch_exception : public std::runtime_error {
public:
digest_mismatch_exception() : std::runtime_error("Digest mismatch") {}
};
class abstract_read_resolver {
protected:
db::consistency_level _cl;
@@ -1442,37 +1437,31 @@ public:
class digest_read_resolver : public abstract_read_resolver {
size_t _block_for;
size_t _cl_responses = 0;
promise<> _cl_promise; // cl is reached
promise<foreign_ptr<lw_shared_ptr<query::result>>, bool> _cl_promise; // cl is reached
bool _cl_reported = false;
std::vector<foreign_ptr<lw_shared_ptr<query::result>>> _data_results;
foreign_ptr<lw_shared_ptr<query::result>> _data_result;
std::vector<query::result_digest> _digest_results;
virtual void on_timeout() override {
if (_cl_responses < _block_for) {
_cl_promise.set_exception(read_timeout_exception(_cl, _cl_responses, _block_for, _data_results.size() != 0));
if (!_cl_reported) {
_cl_promise.set_exception(read_timeout_exception(_cl, _cl_responses, _block_for, _data_result));
}
// we will not need them any more
_data_results.clear();
_data_result = foreign_ptr<lw_shared_ptr<query::result>>();
_digest_results.clear();
}
virtual size_t response_count() const override {
return _digest_results.size();
}
bool digests_match() const {
assert(response_count());
if (response_count() == 1) {
return true;
}
auto& first = *_digest_results.begin();
return std::find_if(_digest_results.begin() + 1, _digest_results.end(), [&first] (query::result_digest digest) { return digest != first; }) == _digest_results.end();
}
public:
digest_read_resolver(db::consistency_level cl, size_t block_for, std::chrono::steady_clock::time_point timeout) : abstract_read_resolver(cl, 0, timeout), _block_for(block_for) {}
void add_data(gms::inet_address from, foreign_ptr<lw_shared_ptr<query::result>> result) {
if (!_timedout) {
// if only one target was queried digest_check() will be skipped so we can also skip digest calculation
_digest_results.emplace_back(_targets_count == 1 ? query::result_digest() : result->digest());
_data_results.emplace_back(std::move(result));
if (!_data_result) {
_data_result = std::move(result);
}
got_response(from);
}
}
@@ -1482,12 +1471,13 @@ public:
got_response(from);
}
}
foreign_ptr<lw_shared_ptr<query::result>> resolve() {
assert(_data_results.size());
if (!digests_match()) {
throw digest_mismatch_exception();
bool digests_match() const {
assert(response_count());
if (response_count() == 1) {
return true;
}
return std::move(*_data_results.begin());
auto& first = *_digest_results.begin();
return std::find_if(_digest_results.begin() + 1, _digest_results.end(), [&first] (query::result_digest digest) { return digest != first; }) == _digest_results.end();
}
bool waiting_for(gms::inet_address ep) {
return db::is_datacenter_local(_cl) ? is_me(ep) || db::is_local(ep) : true;
@@ -1497,9 +1487,9 @@ public:
if (waiting_for(ep)) {
_cl_responses++;
}
if (_cl_responses >= _block_for && _data_results.size()) {
if (_cl_responses >= _block_for && _data_result) {
_cl_reported = true;
_cl_promise.set_value();
_cl_promise.set_value(std::move(_data_result), digests_match());
}
}
if (is_completed()) {
@@ -1507,11 +1497,11 @@ public:
_done_promise.set_value();
}
}
future<> has_cl() {
future<foreign_ptr<lw_shared_ptr<query::result>>, bool> has_cl() {
return _cl_promise.get_future();
}
bool has_data() {
return _data_results.size() != 0;
return _data_result;
}
void add_wait_targets(size_t targets_count) {
_targets_count += targets_count;
@@ -1803,37 +1793,41 @@ public:
// hold on to executor until all queries are complete
});
digest_resolver->has_cl().then_wrapped([exec, digest_resolver, timeout] (future<> f) {
digest_resolver->has_cl().then_wrapped([exec, digest_resolver, timeout] (future<foreign_ptr<lw_shared_ptr<query::result>>, bool> f) {
try {
exec->got_cl();
f.get();
exec->_result_promise.set_value(digest_resolver->resolve()); // can throw digest missmatch exception
auto done = digest_resolver->done();
if (exec->_block_for < exec->_targets.size()) { // if there are more targets then needed for cl, check digest in background
exec->_proxy->_stats.background_reads++;
done.then_wrapped([exec, digest_resolver, timeout] (future<>&& f){
try {
f.get();
digest_resolver->resolve();
exec->_proxy->_stats.background_reads--;
} catch(digest_mismatch_exception& ex) {
exec->_proxy->_stats.read_repair_repaired_background++;
exec->_result_promise = promise<foreign_ptr<lw_shared_ptr<query::result>>>();
exec->reconcile(exec->_cl, timeout);
exec->_result_promise.get_future().then_wrapped([exec] (auto f) {
f.ignore_ready_future(); // ignore any failures during background repair
exec->_proxy->_stats.background_reads--;
});
} catch(...) {
// ignore all exception besides digest mismatch during background check
}
});
} else {
done.discard_result(); // no need for background check, discard done future explicitly
foreign_ptr<lw_shared_ptr<query::result>> result;
bool digests_match;
std::tie(result, digests_match) = f.get(); // can throw
if (digests_match) {
exec->_result_promise.set_value(std::move(result));
auto done = digest_resolver->done();
if (exec->_block_for < exec->_targets.size()) { // if there are more targets then needed for cl, check digest in background
exec->_proxy->_stats.background_reads++;
done.then_wrapped([exec, digest_resolver, timeout] (future<>&& f){
if (f.failed()) {
f.ignore_ready_future(); // ignore all exception besides digest mismatch during background check
} else {
if (!digest_resolver->digests_match()) {
exec->_proxy->_stats.read_repair_repaired_background++;
exec->_result_promise = promise<foreign_ptr<lw_shared_ptr<query::result>>>();
exec->reconcile(exec->_cl, timeout);
exec->_result_promise.get_future().then_wrapped([exec] (future<foreign_ptr<lw_shared_ptr<query::result>>> f) {
f.ignore_ready_future(); // ignore any failures during background repair
exec->_proxy->_stats.background_reads--;
});
} else {
exec->_proxy->_stats.background_reads--;
}
}
});
}
} else { // digest missmatch
exec->reconcile(exec->_cl, timeout);
exec->_proxy->_stats.read_repair_repaired_blocking++;
}
} catch (digest_mismatch_exception& ex) {
exec->reconcile(exec->_cl, timeout);
exec->_proxy->_stats.read_repair_repaired_blocking++;
} catch (read_timeout_exception& ex) {
exec->_result_promise.set_exception(ex);
}

View File

@@ -1968,7 +1968,8 @@ std::unordered_multimap<range<token>, inet_address> storage_service::get_changed
auto metadata = _token_metadata.clone_only_token_map(); // don't do this in the loop! #7758
for (auto& r : ranges) {
auto& ks = _db.local().find_keyspace(keyspace_name);
auto eps = ks.get_replication_strategy().calculate_natural_endpoints(r.end()->value(), metadata);
auto end_token = r.end() ? r.end()->value() : dht::maximum_token();
auto eps = ks.get_replication_strategy().calculate_natural_endpoints(end_token, metadata);
current_replica_endpoints.emplace(r, std::move(eps));
}
@@ -1989,7 +1990,8 @@ std::unordered_multimap<range<token>, inet_address> storage_service::get_changed
// range.
for (auto& r : ranges) {
auto& ks = _db.local().find_keyspace(keyspace_name);
auto new_replica_endpoints = ks.get_replication_strategy().calculate_natural_endpoints(r.end()->value(), temp);
auto end_token = r.end() ? r.end()->value() : dht::maximum_token();
auto new_replica_endpoints = ks.get_replication_strategy().calculate_natural_endpoints(end_token, temp);
auto rg = current_replica_endpoints.equal_range(r);
for (auto it = rg.first; it != rg.second; it++) {
@@ -2503,8 +2505,9 @@ void storage_service::range_relocator::calculate_to_from_streams(std::unordered_
if (r.contains(to_fetch, dht::token_comparator())) {
std::vector<inet_address> endpoints;
if (dht::range_streamer::use_strict_consistency()) {
auto end_token = to_fetch.end() ? to_fetch.end()->value() : dht::maximum_token();
std::vector<inet_address> old_endpoints = eps;
std::vector<inet_address> new_endpoints = strategy.calculate_natural_endpoints(to_fetch.end()->value(), token_meta_clone_all_settled);
std::vector<inet_address> new_endpoints = strategy.calculate_natural_endpoints(end_token, token_meta_clone_all_settled);
//Due to CASSANDRA-5953 we can have a higher RF then we have endpoints.
//So we need to be careful to only be strict when endpoints == RF
@@ -2563,8 +2566,9 @@ void storage_service::range_relocator::calculate_to_from_streams(std::unordered_
std::unordered_multimap<inet_address, range<token>> endpoint_ranges;
std::unordered_map<inet_address, std::vector<range<token>>> endpoint_ranges_map;
for (range<token> to_stream : ranges_per_keyspace.first) {
std::vector<inet_address> current_endpoints = strategy.calculate_natural_endpoints(to_stream.end()->value(), token_meta_clone);
std::vector<inet_address> new_endpoints = strategy.calculate_natural_endpoints(to_stream.end()->value(), token_meta_clone_all_settled);
auto end_token = to_stream.end() ? to_stream.end()->value() : dht::maximum_token();
std::vector<inet_address> current_endpoints = strategy.calculate_natural_endpoints(end_token, token_meta_clone);
std::vector<inet_address> new_endpoints = strategy.calculate_natural_endpoints(end_token, token_meta_clone_all_settled);
logger.debug("Range: {} Current endpoints: {} New endpoints: {}", to_stream, current_endpoints, new_endpoints);
std::sort(current_endpoints.begin(), current_endpoints.end());
std::sort(new_endpoints.begin(), new_endpoints.end());

View File

@@ -1019,7 +1019,7 @@ SEASTAR_TEST_CASE(compaction_manager_test) {
// were compacted.
BOOST_REQUIRE(cf->sstables_count() == generations->size());
cm->submit(&*cf);
cf->trigger_compaction();
BOOST_REQUIRE(cm->get_stats().pending_tasks == 1);
// wait for submitted job to finish.

View File

@@ -1059,7 +1059,8 @@ public:
if (!num) {
return 1;
}
return boost::multiprecision::cpp_int::canonical_value(num).size() * sizeof(boost::multiprecision::limb_type) + 1;
auto pnum = abs(num);
return align_up(boost::multiprecision::msb(pnum) + 2, 8u) / 8;
}
virtual int32_t compare(bytes_view v1, bytes_view v2) const override {
if (v1.empty()) {