Compare commits
14 Commits
next
...
scylla-0.1
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
bf71575fd7 | ||
|
|
cd75075214 | ||
|
|
e85f11566b | ||
|
|
8f682f018e | ||
|
|
dba2b617e7 | ||
|
|
f4e11007cf | ||
|
|
fdfa1df395 | ||
|
|
116055cc6f | ||
|
|
04c19344de | ||
|
|
df19e546f9 | ||
|
|
b532919c55 | ||
|
|
6ae6dcc2fc | ||
|
|
5716140a14 | ||
|
|
91cb9bae2e |
@@ -1,6 +1,6 @@
|
||||
#!/bin/sh
|
||||
|
||||
VERSION=666.development
|
||||
VERSION=0.18.1
|
||||
|
||||
if test -f version
|
||||
then
|
||||
|
||||
@@ -42,6 +42,14 @@ private:
|
||||
struct chunk {
|
||||
// FIXME: group fragment pointers to reduce pointer chasing when packetizing
|
||||
std::unique_ptr<chunk> next;
|
||||
~chunk() {
|
||||
auto p = std::move(next);
|
||||
while (p) {
|
||||
// Avoid recursion when freeing chunks
|
||||
auto p_next = std::move(p->next);
|
||||
p = std::move(p_next);
|
||||
}
|
||||
}
|
||||
size_type offset; // Also means "size" after chunk is closed
|
||||
size_type size;
|
||||
value_type data[0];
|
||||
|
||||
24
database.cc
24
database.cc
@@ -859,14 +859,25 @@ void column_family::start_compaction() {
|
||||
|
||||
void column_family::trigger_compaction() {
|
||||
// Submitting compaction job to compaction manager.
|
||||
// #934 - always inc the pending counter, to help
|
||||
// indicate the want for compaction.
|
||||
_stats.pending_compactions++;
|
||||
do_trigger_compaction(); // see below
|
||||
}
|
||||
|
||||
void column_family::do_trigger_compaction() {
|
||||
// But only submit if we're not locked out
|
||||
if (!_compaction_disabled) {
|
||||
_stats.pending_compactions++;
|
||||
_compaction_manager.submit(this);
|
||||
}
|
||||
}
|
||||
|
||||
future<> column_family::run_compaction(sstables::compaction_descriptor descriptor) {
|
||||
assert(_stats.pending_compactions > 0);
|
||||
return compact_sstables(std::move(descriptor)).then([this] {
|
||||
// only do this on success. (no exceptions)
|
||||
// in that case, we rely on it being still set
|
||||
// for reqeueuing
|
||||
_stats.pending_compactions--;
|
||||
});
|
||||
}
|
||||
@@ -1082,7 +1093,13 @@ future<> database::populate_keyspace(sstring datadir, sstring ks_name) {
|
||||
sstring uuidst = comps[1];
|
||||
|
||||
try {
|
||||
auto&& uuid = find_uuid(ks_name, cfname);
|
||||
auto&& uuid = [&] {
|
||||
try {
|
||||
return find_uuid(ks_name, cfname);
|
||||
} catch (const std::out_of_range& e) {
|
||||
std::throw_with_nested(no_such_column_family(ks_name, cfname));
|
||||
}
|
||||
}();
|
||||
auto& cf = find_column_family(uuid);
|
||||
|
||||
// #870: Check that the directory name matches
|
||||
@@ -2272,7 +2289,8 @@ void column_family::clear() {
|
||||
// NOTE: does not need to be futurized, but might eventually, depending on
|
||||
// if we implement notifications, whatnot.
|
||||
future<db::replay_position> column_family::discard_sstables(db_clock::time_point truncated_at) {
|
||||
assert(_stats.pending_compactions == 0);
|
||||
assert(_compaction_disabled > 0);
|
||||
assert(!compaction_manager_queued());
|
||||
|
||||
return with_lock(_sstables_lock.for_read(), [this, truncated_at] {
|
||||
db::replay_position rp;
|
||||
|
||||
@@ -206,6 +206,7 @@ private:
|
||||
key_source sstables_as_key_source() const;
|
||||
partition_presence_checker make_partition_presence_checker(lw_shared_ptr<sstable_list> old_sstables);
|
||||
std::chrono::steady_clock::time_point _sstable_writes_disabled_at;
|
||||
void do_trigger_compaction();
|
||||
public:
|
||||
// Creates a mutation reader which covers all data sources for this column family.
|
||||
// Caller needs to ensure that column_family remains live (FIXME: relax this).
|
||||
@@ -361,8 +362,12 @@ public:
|
||||
Result run_with_compaction_disabled(Func && func) {
|
||||
++_compaction_disabled;
|
||||
return _compaction_manager.remove(this).then(std::forward<Func>(func)).finally([this] {
|
||||
if (--_compaction_disabled == 0) {
|
||||
trigger_compaction();
|
||||
// #934. The pending counter is actually a great indicator into whether we
|
||||
// actually need to trigger a compaction again.
|
||||
if (--_compaction_disabled == 0 && _stats.pending_compactions > 0) {
|
||||
// we're turning if on again, use function that does not increment
|
||||
// the counter further.
|
||||
do_trigger_compaction();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
2
dist/ami/scylla.json
vendored
2
dist/ami/scylla.json
vendored
@@ -8,7 +8,7 @@
|
||||
"security_group_id": "{{user `security_group_id`}}",
|
||||
"region": "{{user `region`}}",
|
||||
"associate_public_ip_address": "{{user `associate_public_ip_address`}}",
|
||||
"source_ami": "ami-8ef1d6e4",
|
||||
"source_ami": "ami-f3102499",
|
||||
"user_data_file": "user_data.txt",
|
||||
"instance_type": "{{user `instance_type`}}",
|
||||
"ssh_username": "centos",
|
||||
|
||||
3
dist/ubuntu/rules.in
vendored
3
dist/ubuntu/rules.in
vendored
@@ -40,7 +40,8 @@ override_dh_auto_install:
|
||||
cp -r $(CURDIR)/licenses $(DOC)
|
||||
|
||||
mkdir -p $(SCRIPTS) && \
|
||||
cp $(CURDIR)/seastar/dpdk/tools/dpdk_nic_bind.py $(SCRIPTS)
|
||||
cp $(CURDIR)/seastar/scripts/dpdk_nic_bind.py $(SCRIPTS)
|
||||
cp $(CURDIR)/seastar/scripts/posix_net_conf.sh $(SCRIPTS)
|
||||
cp $(CURDIR)/dist/common/scripts/* $(SCRIPTS)
|
||||
cp $(CURDIR)/dist/ubuntu/scripts/* $(SCRIPTS)
|
||||
|
||||
|
||||
@@ -168,12 +168,12 @@ abstract_replication_strategy::get_address_ranges(token_metadata& tm) const {
|
||||
if (wrap) {
|
||||
auto split_ranges = r.unwrap();
|
||||
for (auto ep : eps) {
|
||||
ret.emplace(ep, std::move(split_ranges.first));
|
||||
ret.emplace(ep, std::move(split_ranges.second));
|
||||
ret.emplace(ep, split_ranges.first);
|
||||
ret.emplace(ep, split_ranges.second);
|
||||
}
|
||||
} else {
|
||||
for (auto ep : eps) {
|
||||
ret.emplace(ep, std::move(r));
|
||||
ret.emplace(ep, r);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -190,12 +190,12 @@ abstract_replication_strategy::get_range_addresses(token_metadata& tm) const {
|
||||
if (wrap) {
|
||||
auto split_ranges = r.unwrap();
|
||||
for (auto ep : eps) {
|
||||
ret.emplace(std::move(split_ranges.first), ep);
|
||||
ret.emplace(std::move(split_ranges.second), ep);
|
||||
ret.emplace(split_ranges.first, ep);
|
||||
ret.emplace(split_ranges.second, ep);
|
||||
}
|
||||
} else {
|
||||
for (auto ep : eps) {
|
||||
ret.emplace(std::move(r), ep);
|
||||
ret.emplace(r, ep);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -398,7 +398,7 @@ void token_metadata::calculate_pending_ranges(abstract_replication_strategy& str
|
||||
// all leaving nodes are gone.
|
||||
auto metadata = clone_only_token_map(); // don't do this in the loop! #7758
|
||||
for (const auto& r : affected_ranges) {
|
||||
auto t = r.end()->value();
|
||||
auto t = r.end() ? r.end()->value() : dht::maximum_token();
|
||||
auto current_endpoints = strategy.calculate_natural_endpoints(t, metadata);
|
||||
auto new_endpoints = strategy.calculate_natural_endpoints(t, all_left_metadata);
|
||||
std::vector<inet_address> diff;
|
||||
|
||||
@@ -1390,11 +1390,6 @@ future<> storage_proxy::schedule_repair(std::unordered_map<gms::inet_address, st
|
||||
}).finally([p = shared_from_this()] {});
|
||||
}
|
||||
|
||||
class digest_mismatch_exception : public std::runtime_error {
|
||||
public:
|
||||
digest_mismatch_exception() : std::runtime_error("Digest mismatch") {}
|
||||
};
|
||||
|
||||
class abstract_read_resolver {
|
||||
protected:
|
||||
db::consistency_level _cl;
|
||||
@@ -1442,37 +1437,31 @@ public:
|
||||
class digest_read_resolver : public abstract_read_resolver {
|
||||
size_t _block_for;
|
||||
size_t _cl_responses = 0;
|
||||
promise<> _cl_promise; // cl is reached
|
||||
promise<foreign_ptr<lw_shared_ptr<query::result>>, bool> _cl_promise; // cl is reached
|
||||
bool _cl_reported = false;
|
||||
std::vector<foreign_ptr<lw_shared_ptr<query::result>>> _data_results;
|
||||
foreign_ptr<lw_shared_ptr<query::result>> _data_result;
|
||||
std::vector<query::result_digest> _digest_results;
|
||||
|
||||
virtual void on_timeout() override {
|
||||
if (_cl_responses < _block_for) {
|
||||
_cl_promise.set_exception(read_timeout_exception(_cl, _cl_responses, _block_for, _data_results.size() != 0));
|
||||
if (!_cl_reported) {
|
||||
_cl_promise.set_exception(read_timeout_exception(_cl, _cl_responses, _block_for, _data_result));
|
||||
}
|
||||
// we will not need them any more
|
||||
_data_results.clear();
|
||||
_data_result = foreign_ptr<lw_shared_ptr<query::result>>();
|
||||
_digest_results.clear();
|
||||
}
|
||||
virtual size_t response_count() const override {
|
||||
return _digest_results.size();
|
||||
}
|
||||
bool digests_match() const {
|
||||
assert(response_count());
|
||||
if (response_count() == 1) {
|
||||
return true;
|
||||
}
|
||||
auto& first = *_digest_results.begin();
|
||||
return std::find_if(_digest_results.begin() + 1, _digest_results.end(), [&first] (query::result_digest digest) { return digest != first; }) == _digest_results.end();
|
||||
}
|
||||
public:
|
||||
digest_read_resolver(db::consistency_level cl, size_t block_for, std::chrono::steady_clock::time_point timeout) : abstract_read_resolver(cl, 0, timeout), _block_for(block_for) {}
|
||||
void add_data(gms::inet_address from, foreign_ptr<lw_shared_ptr<query::result>> result) {
|
||||
if (!_timedout) {
|
||||
// if only one target was queried digest_check() will be skipped so we can also skip digest calculation
|
||||
_digest_results.emplace_back(_targets_count == 1 ? query::result_digest() : result->digest());
|
||||
_data_results.emplace_back(std::move(result));
|
||||
if (!_data_result) {
|
||||
_data_result = std::move(result);
|
||||
}
|
||||
got_response(from);
|
||||
}
|
||||
}
|
||||
@@ -1482,12 +1471,13 @@ public:
|
||||
got_response(from);
|
||||
}
|
||||
}
|
||||
foreign_ptr<lw_shared_ptr<query::result>> resolve() {
|
||||
assert(_data_results.size());
|
||||
if (!digests_match()) {
|
||||
throw digest_mismatch_exception();
|
||||
bool digests_match() const {
|
||||
assert(response_count());
|
||||
if (response_count() == 1) {
|
||||
return true;
|
||||
}
|
||||
return std::move(*_data_results.begin());
|
||||
auto& first = *_digest_results.begin();
|
||||
return std::find_if(_digest_results.begin() + 1, _digest_results.end(), [&first] (query::result_digest digest) { return digest != first; }) == _digest_results.end();
|
||||
}
|
||||
bool waiting_for(gms::inet_address ep) {
|
||||
return db::is_datacenter_local(_cl) ? is_me(ep) || db::is_local(ep) : true;
|
||||
@@ -1497,9 +1487,9 @@ public:
|
||||
if (waiting_for(ep)) {
|
||||
_cl_responses++;
|
||||
}
|
||||
if (_cl_responses >= _block_for && _data_results.size()) {
|
||||
if (_cl_responses >= _block_for && _data_result) {
|
||||
_cl_reported = true;
|
||||
_cl_promise.set_value();
|
||||
_cl_promise.set_value(std::move(_data_result), digests_match());
|
||||
}
|
||||
}
|
||||
if (is_completed()) {
|
||||
@@ -1507,11 +1497,11 @@ public:
|
||||
_done_promise.set_value();
|
||||
}
|
||||
}
|
||||
future<> has_cl() {
|
||||
future<foreign_ptr<lw_shared_ptr<query::result>>, bool> has_cl() {
|
||||
return _cl_promise.get_future();
|
||||
}
|
||||
bool has_data() {
|
||||
return _data_results.size() != 0;
|
||||
return _data_result;
|
||||
}
|
||||
void add_wait_targets(size_t targets_count) {
|
||||
_targets_count += targets_count;
|
||||
@@ -1803,37 +1793,41 @@ public:
|
||||
// hold on to executor until all queries are complete
|
||||
});
|
||||
|
||||
digest_resolver->has_cl().then_wrapped([exec, digest_resolver, timeout] (future<> f) {
|
||||
digest_resolver->has_cl().then_wrapped([exec, digest_resolver, timeout] (future<foreign_ptr<lw_shared_ptr<query::result>>, bool> f) {
|
||||
try {
|
||||
exec->got_cl();
|
||||
f.get();
|
||||
exec->_result_promise.set_value(digest_resolver->resolve()); // can throw digest missmatch exception
|
||||
auto done = digest_resolver->done();
|
||||
if (exec->_block_for < exec->_targets.size()) { // if there are more targets then needed for cl, check digest in background
|
||||
exec->_proxy->_stats.background_reads++;
|
||||
done.then_wrapped([exec, digest_resolver, timeout] (future<>&& f){
|
||||
try {
|
||||
f.get();
|
||||
digest_resolver->resolve();
|
||||
exec->_proxy->_stats.background_reads--;
|
||||
} catch(digest_mismatch_exception& ex) {
|
||||
exec->_proxy->_stats.read_repair_repaired_background++;
|
||||
exec->_result_promise = promise<foreign_ptr<lw_shared_ptr<query::result>>>();
|
||||
exec->reconcile(exec->_cl, timeout);
|
||||
exec->_result_promise.get_future().then_wrapped([exec] (auto f) {
|
||||
f.ignore_ready_future(); // ignore any failures during background repair
|
||||
exec->_proxy->_stats.background_reads--;
|
||||
});
|
||||
} catch(...) {
|
||||
// ignore all exception besides digest mismatch during background check
|
||||
}
|
||||
});
|
||||
} else {
|
||||
done.discard_result(); // no need for background check, discard done future explicitly
|
||||
|
||||
foreign_ptr<lw_shared_ptr<query::result>> result;
|
||||
bool digests_match;
|
||||
std::tie(result, digests_match) = f.get(); // can throw
|
||||
|
||||
if (digests_match) {
|
||||
exec->_result_promise.set_value(std::move(result));
|
||||
auto done = digest_resolver->done();
|
||||
if (exec->_block_for < exec->_targets.size()) { // if there are more targets then needed for cl, check digest in background
|
||||
exec->_proxy->_stats.background_reads++;
|
||||
done.then_wrapped([exec, digest_resolver, timeout] (future<>&& f){
|
||||
if (f.failed()) {
|
||||
f.ignore_ready_future(); // ignore all exception besides digest mismatch during background check
|
||||
} else {
|
||||
if (!digest_resolver->digests_match()) {
|
||||
exec->_proxy->_stats.read_repair_repaired_background++;
|
||||
exec->_result_promise = promise<foreign_ptr<lw_shared_ptr<query::result>>>();
|
||||
exec->reconcile(exec->_cl, timeout);
|
||||
exec->_result_promise.get_future().then_wrapped([exec] (future<foreign_ptr<lw_shared_ptr<query::result>>> f) {
|
||||
f.ignore_ready_future(); // ignore any failures during background repair
|
||||
exec->_proxy->_stats.background_reads--;
|
||||
});
|
||||
} else {
|
||||
exec->_proxy->_stats.background_reads--;
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
} else { // digest missmatch
|
||||
exec->reconcile(exec->_cl, timeout);
|
||||
exec->_proxy->_stats.read_repair_repaired_blocking++;
|
||||
}
|
||||
} catch (digest_mismatch_exception& ex) {
|
||||
exec->reconcile(exec->_cl, timeout);
|
||||
exec->_proxy->_stats.read_repair_repaired_blocking++;
|
||||
} catch (read_timeout_exception& ex) {
|
||||
exec->_result_promise.set_exception(ex);
|
||||
}
|
||||
|
||||
@@ -1968,7 +1968,8 @@ std::unordered_multimap<range<token>, inet_address> storage_service::get_changed
|
||||
auto metadata = _token_metadata.clone_only_token_map(); // don't do this in the loop! #7758
|
||||
for (auto& r : ranges) {
|
||||
auto& ks = _db.local().find_keyspace(keyspace_name);
|
||||
auto eps = ks.get_replication_strategy().calculate_natural_endpoints(r.end()->value(), metadata);
|
||||
auto end_token = r.end() ? r.end()->value() : dht::maximum_token();
|
||||
auto eps = ks.get_replication_strategy().calculate_natural_endpoints(end_token, metadata);
|
||||
current_replica_endpoints.emplace(r, std::move(eps));
|
||||
}
|
||||
|
||||
@@ -1989,7 +1990,8 @@ std::unordered_multimap<range<token>, inet_address> storage_service::get_changed
|
||||
// range.
|
||||
for (auto& r : ranges) {
|
||||
auto& ks = _db.local().find_keyspace(keyspace_name);
|
||||
auto new_replica_endpoints = ks.get_replication_strategy().calculate_natural_endpoints(r.end()->value(), temp);
|
||||
auto end_token = r.end() ? r.end()->value() : dht::maximum_token();
|
||||
auto new_replica_endpoints = ks.get_replication_strategy().calculate_natural_endpoints(end_token, temp);
|
||||
|
||||
auto rg = current_replica_endpoints.equal_range(r);
|
||||
for (auto it = rg.first; it != rg.second; it++) {
|
||||
@@ -2503,8 +2505,9 @@ void storage_service::range_relocator::calculate_to_from_streams(std::unordered_
|
||||
if (r.contains(to_fetch, dht::token_comparator())) {
|
||||
std::vector<inet_address> endpoints;
|
||||
if (dht::range_streamer::use_strict_consistency()) {
|
||||
auto end_token = to_fetch.end() ? to_fetch.end()->value() : dht::maximum_token();
|
||||
std::vector<inet_address> old_endpoints = eps;
|
||||
std::vector<inet_address> new_endpoints = strategy.calculate_natural_endpoints(to_fetch.end()->value(), token_meta_clone_all_settled);
|
||||
std::vector<inet_address> new_endpoints = strategy.calculate_natural_endpoints(end_token, token_meta_clone_all_settled);
|
||||
|
||||
//Due to CASSANDRA-5953 we can have a higher RF then we have endpoints.
|
||||
//So we need to be careful to only be strict when endpoints == RF
|
||||
@@ -2563,8 +2566,9 @@ void storage_service::range_relocator::calculate_to_from_streams(std::unordered_
|
||||
std::unordered_multimap<inet_address, range<token>> endpoint_ranges;
|
||||
std::unordered_map<inet_address, std::vector<range<token>>> endpoint_ranges_map;
|
||||
for (range<token> to_stream : ranges_per_keyspace.first) {
|
||||
std::vector<inet_address> current_endpoints = strategy.calculate_natural_endpoints(to_stream.end()->value(), token_meta_clone);
|
||||
std::vector<inet_address> new_endpoints = strategy.calculate_natural_endpoints(to_stream.end()->value(), token_meta_clone_all_settled);
|
||||
auto end_token = to_stream.end() ? to_stream.end()->value() : dht::maximum_token();
|
||||
std::vector<inet_address> current_endpoints = strategy.calculate_natural_endpoints(end_token, token_meta_clone);
|
||||
std::vector<inet_address> new_endpoints = strategy.calculate_natural_endpoints(end_token, token_meta_clone_all_settled);
|
||||
logger.debug("Range: {} Current endpoints: {} New endpoints: {}", to_stream, current_endpoints, new_endpoints);
|
||||
std::sort(current_endpoints.begin(), current_endpoints.end());
|
||||
std::sort(new_endpoints.begin(), new_endpoints.end());
|
||||
|
||||
@@ -1019,7 +1019,7 @@ SEASTAR_TEST_CASE(compaction_manager_test) {
|
||||
// were compacted.
|
||||
|
||||
BOOST_REQUIRE(cf->sstables_count() == generations->size());
|
||||
cm->submit(&*cf);
|
||||
cf->trigger_compaction();
|
||||
BOOST_REQUIRE(cm->get_stats().pending_tasks == 1);
|
||||
|
||||
// wait for submitted job to finish.
|
||||
|
||||
3
types.cc
3
types.cc
@@ -1059,7 +1059,8 @@ public:
|
||||
if (!num) {
|
||||
return 1;
|
||||
}
|
||||
return boost::multiprecision::cpp_int::canonical_value(num).size() * sizeof(boost::multiprecision::limb_type) + 1;
|
||||
auto pnum = abs(num);
|
||||
return align_up(boost::multiprecision::msb(pnum) + 2, 8u) / 8;
|
||||
}
|
||||
virtual int32_t compare(bytes_view v1, bytes_view v2) const override {
|
||||
if (v1.empty()) {
|
||||
|
||||
Reference in New Issue
Block a user