Merge branch 'dev/gleb/for_urchin' from seastar-dev.git

Beginning of read implementation for range queries from Gleb.
This commit is contained in:
Tomasz Grabiec
2015-07-15 12:53:43 +03:00
6 changed files with 555 additions and 416 deletions

View File

@@ -145,12 +145,18 @@ bool operator<(const token& t1, const token& t2)
}
std::ostream& operator<<(std::ostream& out, const token& t) {
auto flags = out.flags();
for (auto c : t._data) {
unsigned char x = c;
out << std::hex << std::setw(2) << std::setfill('0') << +x << " ";
if (t._kind == token::kind::after_all_keys) {
out << "maximum token";
} else if (t._kind == token::kind::before_all_keys) {
out << "minimum token";
} else {
auto flags = out.flags();
for (auto c : t._data) {
unsigned char x = c;
out << std::hex << std::setw(2) << std::setfill('0') << +x << " ";
}
out.flags(flags);
}
out.flags(flags);
return out;
}
@@ -286,4 +292,14 @@ unsigned shard_of(const token& t) {
return v % smp::count;
}
int ring_position_comparator::operator()(const ring_position& lh, const ring_position& rh) const {
if (lh.less_compare(s, rh)) {
return -1;
} else if (lh.equal(s, rh)) {
return 0;
} else {
return 1;
}
}
}

View File

@@ -269,6 +269,26 @@ public:
return { _token, *_key };
}
bool equal(const schema& s, const ring_position& lhr) const {
if (_token != lhr._token) {
return false;
} else if (!_key || !lhr._key){
return true; // empty key "matches" any other key
} else {
return _key->legacy_equal(s, *lhr._key);
};
}
bool less_compare(const schema& s, const ring_position& lhr) const {
if (_token != lhr._token) {
return _token < lhr._token;
} else if (!_key || !lhr._key) {
return false;
} else {
return _key->legacy_tri_compare(s, *lhr._key) < 0;
}
}
size_t serialized_size() const;
void serialize(bytes::iterator& out) const;
static ring_position deserialize(bytes_view& in);
@@ -276,6 +296,11 @@ public:
friend std::ostream& operator<<(std::ostream&, const ring_position&);
};
struct ring_position_comparator {
const schema& s;
ring_position_comparator(const schema& s_) : s(s_) {}
int operator()(const ring_position& lh, const ring_position& rh) const;
};
std::ostream& operator<<(std::ostream& out, const token& t);

View File

@@ -122,39 +122,50 @@ private:
public std::iterator<std::input_iterator_tag, token> {
private:
tokens_iterator(std::vector<token>::const_iterator it, size_t pos)
: _cur_it(it), _ring_pos(pos) {}
: _cur_it(it), _ring_pos(pos), _insert_min(false) {}
public:
tokens_iterator(const token& start, token_metadata* token_metadata)
tokens_iterator(const token& start, token_metadata* token_metadata, bool include_min = false)
: _token_metadata(token_metadata) {
_cur_it = _token_metadata->sorted_tokens().begin() +
_token_metadata->first_token_index(start);
_cur_it = _token_metadata->sorted_tokens().begin() + _token_metadata->first_token_index(start);
_insert_min = include_min && *_token_metadata->sorted_tokens().begin() != dht::minimum_token();
if (_token_metadata->sorted_tokens().empty()) {
_min = true;
}
}
bool operator==(const tokens_iterator& it) const {
return _cur_it == it._cur_it;
return _min == it._min && _cur_it == it._cur_it;
}
bool operator!=(const tokens_iterator& it) const {
return _cur_it != it._cur_it;
return _min != it._min || _cur_it != it._cur_it;
}
const token& operator*() {
return *_cur_it;
if (_min) {
return _min_token;
} else {
return *_cur_it;
}
}
tokens_iterator& operator++() {
if (_ring_pos >= _token_metadata->sorted_tokens().size()) {
_cur_it = _token_metadata->sorted_tokens().end();
} else {
++_cur_it;
++_ring_pos;
if (!_min) {
if (_ring_pos >= _token_metadata->sorted_tokens().size()) {
_cur_it = _token_metadata->sorted_tokens().end();
} else {
++_cur_it;
++_ring_pos;
if (_cur_it == _token_metadata->sorted_tokens().end()) {
_cur_it = _token_metadata->sorted_tokens().begin();
if (_cur_it == _token_metadata->sorted_tokens().end()) {
_cur_it = _token_metadata->sorted_tokens().begin();
_min = _insert_min;
}
}
} else {
_min = false;
}
return *this;
}
@@ -165,6 +176,9 @@ private:
// "start"
//
size_t _ring_pos = 0;
bool _insert_min;
bool _min = false;
const token _min_token = dht::minimum_token();
token_metadata* _token_metadata = nullptr;
friend class token_metadata;
@@ -208,8 +222,8 @@ public:
*
* @return The requested range (see the description above)
*/
auto ring_range(const token& start) {
auto begin = tokens_iterator(start, this);
auto ring_range(const token& start, bool include_min = false) {
auto begin = tokens_iterator(start, this, include_min);
auto end = tokens_end();
return boost::make_iterator_range(begin, end);
}

View File

@@ -41,6 +41,37 @@ public:
, _singular(true)
{ }
range() : range({}, {}) {}
private:
// the point is before the range (works only for non wrapped ranges)
template<typename Comparator>
bool before(const T& point, Comparator&& cmp) const {
if (!_start) {
return false; //open start, no points before
}
auto r = cmp(point, start_value());
if (r < 0) {
return true;
}
if (!_start->is_inclusive() && r == 0) {
return true;
}
return false;
}
// the point is after the range (works only for non wrapped ranges)
template<typename Comparator>
bool after(const T& point, Comparator&& cmp) const {
if (!_end) {
return false; //open end, no points after
}
auto r = cmp(end_value(), point);
if (r < 0) {
return true;
}
if (!_end->is_inclusive() && r == 0) {
return true;
}
return false;
}
public:
static range make(bound start, bound end) {
return range({std::move(start)}, {std::move(end)});
@@ -74,15 +105,40 @@ public:
const T& end_value() const {
return _end->value();
}
const optional<bound>& start() const {
return _start;
}
const optional<bound>& end() const {
return _end;
}
// end is smaller than start
template<typename Comparator>
bool is_wrap_around(Comparator&& cmp) const {
if (_end && _start) {
return cmp(end_value(), start_value()) < 0;
} else {
return false; // open ended range never wraps around
}
}
// the point is inside the range
template<typename Comparator>
bool contains(const T& point, Comparator&& cmp) const {
if (is_wrap_around(cmp)) {
// wrapped range contains point if reverse does not contain it
return !range::make({end_value(), !_end->is_inclusive()}, {start_value(), !_start->is_inclusive()}).contains(point, cmp);
} else {
return !before(point, cmp) && !after(point, cmp);
}
}
// split range in two around a split_point. split_point has to be inside the range
// split_point will belong to first range
template<typename Comparator>
std::pair<range<T>, range<T>> split(const T& split_point, Comparator&& cmp) const {
assert(contains(split_point, std::forward<Comparator>(cmp)));
range left(_start, bound(split_point));
range right(bound(split_point, false), _end);
return std::make_pair(std::move(left), std::move(right));
}
// Transforms this range into a new range of a different value type
// Supplied transformer should transform value of type T (the old type) into value of type U (the new type).
template<typename U, typename Transformer>

View File

@@ -1076,7 +1076,7 @@ future<> storage_proxy::send_to_live_endpoints(storage_proxy::response_id_type r
auto all = boost::range::join(local, dc_groups);
// OK, now send and/or apply locally
return parallel_for_each(all.begin(), all.end(), [response_id, &m, this] (typename decltype(dc_groups)::value_type& dc_targets) mutable {
return parallel_for_each(all.begin(), all.end(), [response_id, &m, this] (typename decltype(dc_groups)::value_type& dc_targets) {
auto my_address = utils::fb_utilities::get_broadcast_address();
auto& forward = dc_targets.second;
@@ -1089,9 +1089,10 @@ future<> storage_proxy::send_to_live_endpoints(storage_proxy::response_id_type r
got_response(response_id, my_address);
});
} else {
auto response_id_ = response_id; // make local copy since capture is reused
auto& ms = net::get_local_messaging_service();
return ms.send_message_oneway(net::messaging_verb::MUTATION, net::messaging_service::shard_id{coordinator, 0}, m, std::move(forward), std::move(my_address),
engine().cpu_id(), std::move(response_id));
engine().cpu_id(), std::move(response_id_));
}
}).finally([mptr] {
// make mutation alive until it is sent or processed locally, otherwise it
@@ -1356,53 +1357,124 @@ public:
digest_mismatch_exception() : std::runtime_error("Digest mismatch") {}
};
class abstract_read_executor : public enable_shared_from_this<abstract_read_executor> {
using targets_iterator = std::vector<gms::inet_address>::iterator;
class read_timeout_exception : public std::runtime_error {
public:
read_timeout_exception() : std::runtime_error("Read operation timed out") {}
};
storage_proxy& _proxy;
keyspace& _ks;
lw_shared_ptr<query::read_command> _cmd;
query::partition_range _partition_range;
class abstract_read_resolver {
protected:
size_t _targets_count;
promise<> _done_promise; // all target responded
public:
abstract_read_resolver(size_t target_count) : _targets_count(target_count) {}
virtual ~abstract_read_resolver() {};
future<> done() {
return _done_promise.get_future();
}
};
class digest_read_resolver : public abstract_read_resolver {
db::consistency_level _cl;
std::vector<gms::inet_address> _targets;
promise<foreign_ptr<lw_shared_ptr<query::result>>> _result_promise;
bool _done = false; // becomes true when promise above is fulfilled (operation may still continue)
size_t _block_for;
size_t _cl_responses = 0;
promise<> _cl_promise; // cl is reached
std::vector<foreign_ptr<lw_shared_ptr<query::result>>> _data_results;
std::vector<query::result_digest> _digest_results;
size_t _responses = 0;
bool digests_match() {
bool digests_match() const {
assert(_digest_results.size());
auto& first = *_digest_results.begin();
return std::find_if(_digest_results.begin() + 1, _digest_results.end(), [&first] (query::result_digest digest) { return digest != first; }) == _digest_results.end();
}
protected:
public:
digest_read_resolver(db::consistency_level cl, size_t block_for, size_t targets_count) : abstract_read_resolver(targets_count), _cl(cl), _block_for(block_for) {}
void add_data(gms::inet_address from, foreign_ptr<lw_shared_ptr<query::result>> result) {
_digest_results.emplace_back(result->digest());
_data_results.emplace_back(std::move(result));
got_response(from);
}
void add_digest(gms::inet_address from, query::result_digest digest) {
_digest_results.emplace_back(std::move(digest));
got_response(from);
}
foreign_ptr<lw_shared_ptr<query::result>> resolve() {
assert(_data_results.size());
if (!digests_match()) {
throw digest_mismatch_exception();
}
return std::move(*_data_results.begin());
}
bool waiting_for(gms::inet_address ep) {
return db::is_datacenter_local(_cl) ? is_me(ep) || db::is_local(ep) : true;
}
void got_response(gms::inet_address ep) {
if (!_done) {
if (_cl_responses < _block_for) {
if (waiting_for(ep)) {
_responses++;
_cl_responses++;
}
if (_responses >= db::block_for(_ks, _cl) && _data_results.size()) {
if (!digests_match()) {
throw digest_mismatch_exception();
} else {
_result_promise.set_value(std::move(*_data_results.begin()));
}
_done = true;
}
} else if (_digest_results.size() == _targets.size()) {
if (!digests_match()) {
throw digest_mismatch_exception();
if (_cl_responses == _block_for && _data_results.size()) {
_cl_promise.set_value();
}
}
if (_digest_results.size() == _targets_count) {
_done_promise.set_value();
}
}
future<> has_cl() {
return _cl_promise.get_future();
}
};
class data_read_resolver : public abstract_read_resolver {
std::unordered_map<gms::inet_address, foreign_ptr<lw_shared_ptr<reconcilable_result>>> _data_results;
public:
abstract_read_executor(storage_proxy& proxy, keyspace& ks, lw_shared_ptr<query::read_command> cmd, query::partition_range pr, db::consistency_level cl, std::vector<gms::inet_address> targets) :
_proxy(proxy), _ks(ks), _cmd(std::move(cmd)), _partition_range(std::move(pr)), _cl(cl), _targets(std::move(targets)) {}
data_read_resolver(size_t targets_count) : abstract_read_resolver(targets_count) {}
void add_mutate_data(gms::inet_address from, foreign_ptr<lw_shared_ptr<reconcilable_result>> result) {
_data_results[from] = std::move(result);
if (_data_results.size() == _targets_count) {
_done_promise.set_value();
}
}
foreign_ptr<lw_shared_ptr<reconcilable_result>> resolve() {
assert(_data_results.size());
return std::move((*_data_results.begin()).second);
}
};
class abstract_read_executor : public enable_shared_from_this<abstract_read_executor> {
protected:
using targets_iterator = std::vector<gms::inet_address>::iterator;
using digest_resolver_ptr = ::shared_ptr<digest_read_resolver>;
using data_resolver_ptr = ::shared_ptr<data_read_resolver>;
storage_proxy& _proxy;
lw_shared_ptr<query::read_command> _cmd;
query::partition_range _partition_range;
db::consistency_level _cl;
size_t _block_for;
std::vector<gms::inet_address> _targets;
promise<foreign_ptr<lw_shared_ptr<query::result>>> _result_promise;
public:
abstract_read_executor(storage_proxy& proxy, lw_shared_ptr<query::read_command> cmd, query::partition_range pr, db::consistency_level cl, size_t block_for,
std::vector<gms::inet_address> targets) :
_proxy(proxy), _cmd(std::move(cmd)), _partition_range(std::move(pr)), _cl(cl), _block_for(block_for), _targets(std::move(targets)) {}
virtual ~abstract_read_executor() {};
protected:
future<foreign_ptr<lw_shared_ptr<reconcilable_result>>> make_mutation_data_request(gms::inet_address ep) {
if (is_me(ep)) {
return _proxy.query_mutations_locally(_cmd, _partition_range);
} else {
auto& ms = net::get_local_messaging_service();
return ms.send_message<reconcilable_result>(net::messaging_verb::READ_MUTATION_DATA, net::messaging_service::shard_id{ep, 0}, *_cmd, _partition_range).then([this](reconcilable_result&& result) {
return make_foreign(::make_lw_shared<reconcilable_result>(std::move(result)));
});
}
}
future<foreign_ptr<lw_shared_ptr<query::result>>> make_data_request(gms::inet_address ep) {
if (is_me(ep)) {
return _proxy.query_singular_local(_cmd, _partition_range);
@@ -1421,54 +1493,114 @@ public:
return ms.send_message<query::result_digest>(net::messaging_verb::READ_DIGEST, net::messaging_service::shard_id{ep, 0}, *_cmd, _partition_range);
}
}
future<> make_data_requests(targets_iterator begin, targets_iterator end) {
return parallel_for_each(begin, end, [this] (gms::inet_address ep) {
return make_data_request(ep).then([exec = shared_from_this(), ep] (foreign_ptr<lw_shared_ptr<query::result>> result) {
exec->_digest_results.emplace_back(std::move(result->digest()));
exec->_data_results.emplace_back(std::move(result));
exec->got_response(ep);
future<> make_mutation_data_requests(data_resolver_ptr resolver, targets_iterator begin, targets_iterator end) {
return parallel_for_each(begin, end, [this, resolver = std::move(resolver)] (gms::inet_address ep) {
return make_mutation_data_request(ep).then([resolver, ep] (foreign_ptr<lw_shared_ptr<reconcilable_result>> result) {
resolver->add_mutate_data(ep, std::move(result));
});
});
}
future<> make_digest_requests(targets_iterator begin, targets_iterator end) {
return parallel_for_each(begin, end, [this] (gms::inet_address ep) {
return make_digest_request(ep).then([exec = shared_from_this(), ep] (query::result_digest&& digest) {
exec->_digest_results.emplace_back(std::move(digest));
exec->got_response(ep);
future<> make_data_requests(digest_resolver_ptr resolver, targets_iterator begin, targets_iterator end) {
return parallel_for_each(begin, end, [this, resolver = std::move(resolver)] (gms::inet_address ep) {
return make_data_request(ep).then([resolver, ep] (foreign_ptr<lw_shared_ptr<query::result>> result) {
resolver->add_data(ep, std::move(result));
});
});
}
virtual future<foreign_ptr<lw_shared_ptr<query::result>>> execute() {
when_all(make_data_requests(_targets.begin(), _targets.begin() + 1),
_targets.size() > 1 ? make_digest_requests(_targets.begin() + 1, _targets.end()) : make_ready_future()).discard_result().then_wrapped([this](future<>&& f) {
future<> make_digest_requests(digest_resolver_ptr resolver, targets_iterator begin, targets_iterator end) {
return parallel_for_each(begin, end, [this, resolver = std::move(resolver)] (gms::inet_address ep) {
return make_digest_request(ep).then([resolver, ep] (query::result_digest&& digest) {
resolver->add_digest(ep, std::move(digest));
});
});
}
virtual future<> make_requests(digest_resolver_ptr resolver) {
return when_all(make_data_requests(resolver, _targets.begin(), _targets.begin() + 1),
_targets.size() > 1 ? make_digest_requests(resolver, _targets.begin() + 1, _targets.end()) : make_ready_future()).discard_result();
}
void reconciliate() {
data_resolver_ptr data_resolver = ::make_shared<data_read_resolver>(_targets.size());
auto exec = shared_from_this();
make_mutation_data_requests(data_resolver, _targets.begin(), _targets.end()).finally([exec]{});
data_resolver->done().then_wrapped([exec, data_resolver] (future<>&& f){
try {
f.get();
} catch(...) {
if (!_done) {
_result_promise.set_exception(std::current_exception());
}
auto rr = data_resolver->resolve(); // reconciliation happens here
schema_ptr s = exec->_proxy._db.local().find_schema(exec->_cmd->cf_id);
auto result = ::make_foreign(::make_lw_shared(to_data_query_result(*rr, std::move(s), exec->_cmd->slice)));
exec->_result_promise.set_value(std::move(result));
} catch(read_timeout_exception& ex) {
exec->_result_promise.set_exception(std::current_exception());
}
});
}
public:
virtual future<foreign_ptr<lw_shared_ptr<query::result>>> execute() {
digest_resolver_ptr digest_resolver = ::make_shared<digest_read_resolver>(_cl, _block_for, _targets.size());
auto exec = shared_from_this();
make_requests(digest_resolver).finally([exec]() {
// hold on to executor until all queries are complete
});
digest_resolver->has_cl().then_wrapped([exec, digest_resolver] (future<>&& f) {
try {
f.get();
exec->_result_promise.set_value(digest_resolver->resolve());
auto done = digest_resolver->done();
if (exec->_block_for < exec->_targets.size()) { // if there are more targets then needed for cl, check digest in background
done.then_wrapped([exec, digest_resolver] (future<>&& f){
try {
f.get();
digest_resolver->resolve();
} catch(digest_mismatch_exception& ex) {
// FIXME: do read repair here
} catch(...) {
// ignore all exception besides digest mismatch during background check
}
});
} else {
done.discard_result(); // no need for background check, discard done future explicitly
}
} catch (digest_mismatch_exception& ex) {
exec->reconciliate();
} catch(read_timeout_exception& ex) {
exec->_result_promise.set_exception(std::current_exception());
}
});
return _result_promise.get_future();
}
};
class never_speculating_read_executor : public abstract_read_executor {
public:
never_speculating_read_executor(storage_proxy& proxy, keyspace& ks, lw_shared_ptr<query::read_command> cmd, query::partition_range pr, db::consistency_level cl, std::vector<gms::inet_address> targets) :
abstract_read_executor(proxy, ks, std::move(cmd), std::move(pr), cl, std::move(targets)) {}
never_speculating_read_executor(storage_proxy& proxy, lw_shared_ptr<query::read_command> cmd, query::partition_range pr, db::consistency_level cl, size_t block_for, std::vector<gms::inet_address> targets) :
abstract_read_executor(proxy, std::move(cmd), std::move(pr), cl, block_for, std::move(targets)) {}
};
class always_speculating_read_executor : public abstract_read_executor {
public:
always_speculating_read_executor(storage_proxy& proxy, keyspace& ks, lw_shared_ptr<query::read_command> cmd, query::partition_range pr, db::consistency_level cl, std::vector<gms::inet_address> targets) :
abstract_read_executor(proxy, ks, std::move(cmd), std::move(pr), cl, std::move(targets)) {}
always_speculating_read_executor(storage_proxy& proxy, lw_shared_ptr<query::read_command> cmd, query::partition_range pr, db::consistency_level cl, size_t block_for, std::vector<gms::inet_address> targets) :
abstract_read_executor(proxy,std::move(cmd), std::move(pr), cl, block_for, std::move(targets)) {}
};
class speculating_read_executor : public abstract_read_executor {
public:
speculating_read_executor(storage_proxy& proxy, keyspace& ks, lw_shared_ptr<query::read_command> cmd, query::partition_range pr, db::consistency_level cl, std::vector<gms::inet_address> targets) :
abstract_read_executor(proxy, ks, std::move(cmd), std::move(pr), cl, std::move(targets)) {}
speculating_read_executor(storage_proxy& proxy, lw_shared_ptr<query::read_command> cmd, query::partition_range pr, db::consistency_level cl, size_t block_for, std::vector<gms::inet_address> targets) :
abstract_read_executor(proxy, std::move(cmd), std::move(pr), cl, block_for, std::move(targets)) {}
};
class range_slice_read_executor : public abstract_read_executor {
public:
range_slice_read_executor(storage_proxy& proxy, lw_shared_ptr<query::read_command> cmd, query::partition_range pr, db::consistency_level cl, std::vector<gms::inet_address> targets) :
abstract_read_executor(proxy, std::move(cmd), std::move(pr), cl, targets.size(), std::move(targets)) {}
virtual future<foreign_ptr<lw_shared_ptr<query::result>>> execute() override {
reconciliate();
return _result_promise.get_future();
}
};
enum class speculative_retry_type {
@@ -1497,16 +1629,17 @@ enum class speculative_retry_type {
#endif
speculative_retry_type retry_type = speculative_retry_type::NONE;//cfs.metadata.getSpeculativeRetry().type;
size_t block_for = db::block_for(ks, cl);
// Speculative retry is disabled *OR* there are simply no extra replicas to speculate.
if (retry_type == speculative_retry_type::NONE || block_for(ks, cl) == all_replicas.size()) {
return ::make_shared<never_speculating_read_executor>(*this, ks, cmd, std::move(pr), cl, std::move(target_replicas));
if (retry_type == speculative_retry_type::NONE || db::block_for(ks, cl) == all_replicas.size()) {
return ::make_shared<never_speculating_read_executor>(*this, cmd, std::move(pr), cl, block_for, std::move(target_replicas));
}
if (target_replicas.size() == all_replicas.size()) {
// CL.ALL, RRD.GLOBAL or RRD.DC_LOCAL and a single-DC.
// We are going to contact every node anyway, so ask for 2 full data requests instead of 1, for redundancy
// (same amount of requests in total, but we turn 1 digest request into a full blown data request).
return ::make_shared<always_speculating_read_executor>(/*cfs, */*this, ks, cmd, std::move(pr), cl, std::move(target_replicas));
return ::make_shared<always_speculating_read_executor>(/*cfs, */*this, cmd, std::move(pr), cl, block_for, std::move(target_replicas));
}
// RRD.NONE or RRD.DC_LOCAL w/ multiple DCs.
@@ -1522,9 +1655,9 @@ enum class speculative_retry_type {
target_replicas.push_back(extra_replica);
if (retry_type == speculative_retry_type::ALWAYS) {
return ::make_shared<always_speculating_read_executor>(/*cfs,*/ *this, ks, cmd, std::move(pr), cl, std::move(target_replicas));
return ::make_shared<always_speculating_read_executor>(/*cfs,*/ *this, cmd, std::move(pr), cl, block_for, std::move(target_replicas));
} else {// PERCENTILE or CUSTOM.
return ::make_shared<speculating_read_executor>(/*cfs,*/ *this, ks, cmd, std::move(pr), cl, std::move(target_replicas));
return ::make_shared<speculating_read_executor>(/*cfs,*/ *this, cmd, std::move(pr), cl, block_for, std::move(target_replicas));
}
}
@@ -1569,6 +1702,132 @@ storage_proxy::query_singular(lw_shared_ptr<query::read_command> cmd, std::vecto
});
}
future<std::vector<foreign_ptr<lw_shared_ptr<query::result>>>>
storage_proxy::query_partition_key_range_concurrent(std::vector<foreign_ptr<lw_shared_ptr<query::result>>>&& results,
lw_shared_ptr<query::read_command> cmd, db::consistency_level cl, std::vector<query::partition_range>::iterator&& i,
std::vector<query::partition_range>&& ranges, int concurrency_factor) {
schema_ptr schema = _db.local().find_schema(cmd->cf_id);
keyspace& ks = _db.local().find_keyspace(schema->ks_name());
std::vector<::shared_ptr<abstract_read_executor>> exec;
auto concurrent_fetch_starting_index = i;
while (i != ranges.end() && std::distance(i, concurrent_fetch_starting_index) < concurrency_factor) {
query::partition_range& range = *i;
std::vector<gms::inet_address> live_endpoints = get_live_sorted_endpoints(ks, range.end_value().token());
std::vector<gms::inet_address> filtered_endpoints = filter_for_query(cl, ks, live_endpoints);
++i;
// getRestrictedRange has broken the queried range into per-[vnode] token ranges, but this doesn't take
// the replication factor into account. If the intersection of live endpoints for 2 consecutive ranges
// still meets the CL requirements, then we can merge both ranges into the same RangeSliceCommand.
while (i != ranges.end())
{
auto next_range = i;
std::vector<gms::inet_address> next_endpoints = get_live_sorted_endpoints(ks, next_range->end_value().token());
std::vector<gms::inet_address> next_filtered_endpoints = filter_for_query(cl, ks, next_endpoints);
// Origin has this to say here:
// * If the current range right is the min token, we should stop merging because CFS.getRangeSlice
// * don't know how to deal with a wrapping range.
// * Note: it would be slightly more efficient to have CFS.getRangeSlice on the destination nodes unwraps
// * the range if necessary and deal with it. However, we can't start sending wrapped range without breaking
// * wire compatibility, so It's likely easier not to bother;
// It obviously not apply for us(?), but lets follow origin for now
if (range.end_value().token() == dht::minimum_token()) {
break;
}
std::vector<gms::inet_address> merged = intersection(live_endpoints, next_endpoints);
// Check if there is enough endpoint for the merge to be possible.
if (!is_sufficient_live_nodes(cl, ks, merged)) {
break;
}
std::vector<gms::inet_address> filtered_merged = filter_for_query(cl, ks, merged);
// Estimate whether merging will be a win or not
if (!locator::i_endpoint_snitch::get_local_snitch_ptr()->is_worth_merging_for_range_query(filtered_merged, filtered_endpoints, next_filtered_endpoints)) {
break;
}
// If we get there, merge this range and the next one
range = query::partition_range(range.start(), next_range->end());
live_endpoints = std::move(merged);
filtered_endpoints = std::move(filtered_merged);
++i;
}
db::assure_sufficient_live_nodes(cl, ks, filtered_endpoints);
exec.push_back(::make_shared<range_slice_read_executor>(*this, cmd, std::move(range), cl, std::move(filtered_endpoints)));
}
query::result_merger merger;
merger.reserve(exec.size());
auto f = ::map_reduce(exec.begin(), exec.end(), [this, cmd] (::shared_ptr<abstract_read_executor>& rex) {
return rex->execute();
}, std::move(merger));
return f.then([this, exec = std::move(exec), results = std::move(results), i = std::move(i), ranges = std::move(ranges), cl, cmd, concurrency_factor]
(foreign_ptr<lw_shared_ptr<query::result>>&& result) mutable {
results.emplace_back(std::move(result));
if (i == ranges.end()) {
return make_ready_future<std::vector<foreign_ptr<lw_shared_ptr<query::result>>>>(std::move(results));
} else {
return query_partition_key_range_concurrent(std::move(results), cmd, cl, std::move(i), std::move(ranges), concurrency_factor);
}
});
}
future<foreign_ptr<lw_shared_ptr<query::result>>>
storage_proxy::query_partition_key_range(lw_shared_ptr<query::read_command> cmd, query::partition_range&& range, db::consistency_level cl) {
schema_ptr schema = _db.local().find_schema(cmd->cf_id);
keyspace& ks = _db.local().find_keyspace(schema->ks_name());
std::vector<query::partition_range> ranges;
// when dealing with LocalStrategy keyspaces, we can skip the range splitting and merging (which can be
// expensive in clusters with vnodes)
if (ks.get_replication_strategy().get_type() == locator::replication_strategy_type::local) {
ranges.emplace_back(std::move(range));
} else {
ranges = get_restricted_ranges(ks, *schema, std::move(range));
}
// our estimate of how many result rows there will be per-range
float result_rows_per_range = estimate_result_rows_per_range(cmd, ks);
// underestimate how many rows we will get per-range in order to increase the likelihood that we'll
// fetch enough rows in the first round
result_rows_per_range -= result_rows_per_range * CONCURRENT_SUBREQUESTS_MARGIN;
int concurrency_factor = result_rows_per_range == 0.0 ? 1 : std::max(1, std::min(int(ranges.size()), int(std::ceil(cmd->row_limit / result_rows_per_range))));
std::vector<foreign_ptr<lw_shared_ptr<query::result>>> results;
results.reserve(ranges.size()/concurrency_factor + 1);
return query_partition_key_range_concurrent(std::move(results), cmd, cl, ranges.begin(), std::move(ranges), concurrency_factor)
.then([](std::vector<foreign_ptr<lw_shared_ptr<query::result>>> results) {
query::result_merger merger;
merger.reserve(results.size());
for (auto&& r: results) {
merger(std::move(r));
}
return merger.get();
});
}
future<foreign_ptr<lw_shared_ptr<query::result>>>
storage_proxy::query_local(lw_shared_ptr<query::read_command> cmd, std::vector<query::partition_range>&& partition_ranges) {
// FIXME: Respect cmd->row_limit to avoid unnecessary transfer
query::result_merger merger;
merger.reserve(smp::count);
return _db.map_reduce(std::move(merger), [cmd, partition_ranges = std::move(partition_ranges)] (database& db) {
return db.query(*cmd, partition_ranges).then([] (auto&& f) {
return make_foreign(std::move(f));
});
}).finally([cmd] {});
}
future<foreign_ptr<lw_shared_ptr<query::result>>>
storage_proxy::query(schema_ptr s, lw_shared_ptr<query::read_command> cmd, std::vector<query::partition_range>&& partition_ranges, db::consistency_level cl) {
static auto make_empty = [] {
@@ -1587,25 +1846,12 @@ storage_proxy::query(schema_ptr s, lw_shared_ptr<query::read_command> cmd, std::
}
}
return do_with(std::move(partition_ranges), [this, s, cmd](std::vector<query::partition_range>& prs) {
return map_reduce(prs.begin(), prs.end(), [this, s, cmd](const query::partition_range& pr) {
return query_mutations_locally(cmd, pr).then([s, cmd](auto&& result_ptr) {
return make_foreign(make_lw_shared(to_data_query_result(*result_ptr, s, cmd->slice)));
});
}, query::result_merger());
});
}
if (partition_ranges.size() != 1) {
// FIXME: implement
throw std::runtime_error("more than one non singular range not supported yet");
}
future<foreign_ptr<lw_shared_ptr<query::result>>>
storage_proxy::query_local(lw_shared_ptr<query::read_command> cmd, std::vector<query::partition_range>&& partition_ranges) {
// FIXME: Respect cmd->row_limit to avoid unnecessary transfer
query::result_merger merger;
merger.reserve(smp::count);
return _db.map_reduce(std::move(merger), [cmd, partition_ranges = std::move(partition_ranges)] (database& db) {
return db.query(*cmd, partition_ranges).then([] (auto&& f) {
return make_foreign(std::move(f));
});
}).finally([cmd] {});
return query_partition_key_range(cmd, std::move(partition_ranges[0]), cl);
}
#if 0
@@ -1966,60 +2212,60 @@ std::vector<gms::inet_address> storage_proxy::get_live_sorted_endpoints(keyspace
return eps;
}
#if 0
private static List<InetAddress> intersection(List<InetAddress> l1, List<InetAddress> l2)
{
// Note: we don't use Guava Sets.intersection() for 3 reasons:
// 1) retainAll would be inefficient if l1 and l2 are large but in practice both are the replicas for a range and
// so will be very small (< RF). In that case, retainAll is in fact more efficient.
// 2) we do ultimately need a list so converting everything to sets don't make sense
// 3) l1 and l2 are sorted by proximity. The use of retainAll maintain that sorting in the result, while using sets wouldn't.
List<InetAddress> inter = new ArrayList<InetAddress>(l1);
inter.retainAll(l2);
return inter;
}
std::vector<gms::inet_address> storage_proxy::intersection(const std::vector<gms::inet_address>& l1, const std::vector<gms::inet_address>& l2) {
std::vector<gms::inet_address> inter;
inter.reserve(l1.size());
std::remove_copy_if(l1.begin(), l1.end(), std::back_inserter(inter), [&l2] (const gms::inet_address& a) {
return std::find(l2.begin(), l2.end(), a) == l2.end();
});
return inter;
}
/**
* Estimate the number of result rows (either cql3 rows or storage rows, as called for by the command) per
* range in the ring based on our local data. This assumes that ranges are uniformly distributed across the cluster
* and that the queried data is also uniformly distributed.
*/
private static float estimateResultRowsPerRange(AbstractRangeCommand command, Keyspace keyspace)
/**
* Estimate the number of result rows (either cql3 rows or storage rows, as called for by the command) per
* range in the ring based on our local data. This assumes that ranges are uniformly distributed across the cluster
* and that the queried data is also uniformly distributed.
*/
float storage_proxy::estimate_result_rows_per_range(lw_shared_ptr<query::read_command> cmd, keyspace& ks)
{
return 1.0;
#if 0
ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(command.columnFamily);
float resultRowsPerRange = Float.POSITIVE_INFINITY;
if (command.rowFilter != null && !command.rowFilter.isEmpty())
{
ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(command.columnFamily);
float resultRowsPerRange = Float.POSITIVE_INFINITY;
if (command.rowFilter != null && !command.rowFilter.isEmpty())
{
List<SecondaryIndexSearcher> searchers = cfs.indexManager.getIndexSearchersForQuery(command.rowFilter);
if (searchers.isEmpty())
{
resultRowsPerRange = calculateResultRowsUsingEstimatedKeys(cfs);
}
else
{
// Secondary index query (cql3 or otherwise). Estimate result rows based on most selective 2ary index.
for (SecondaryIndexSearcher searcher : searchers)
{
// use our own mean column count as our estimate for how many matching rows each node will have
SecondaryIndex highestSelectivityIndex = searcher.highestSelectivityIndex(command.rowFilter);
resultRowsPerRange = Math.min(resultRowsPerRange, highestSelectivityIndex.estimateResultRows());
}
}
}
else if (!command.countCQL3Rows())
{
// non-cql3 query
resultRowsPerRange = cfs.estimateKeys();
}
else
List<SecondaryIndexSearcher> searchers = cfs.indexManager.getIndexSearchersForQuery(command.rowFilter);
if (searchers.isEmpty())
{
resultRowsPerRange = calculateResultRowsUsingEstimatedKeys(cfs);
}
// adjust resultRowsPerRange by the number of tokens this node has and the replication factor for this ks
return (resultRowsPerRange / DatabaseDescriptor.getNumTokens()) / keyspace.getReplicationStrategy().getReplicationFactor();
else
{
// Secondary index query (cql3 or otherwise). Estimate result rows based on most selective 2ary index.
for (SecondaryIndexSearcher searcher : searchers)
{
// use our own mean column count as our estimate for how many matching rows each node will have
SecondaryIndex highestSelectivityIndex = searcher.highestSelectivityIndex(command.rowFilter);
resultRowsPerRange = Math.min(resultRowsPerRange, highestSelectivityIndex.estimateResultRows());
}
}
}
else if (!command.countCQL3Rows())
{
// non-cql3 query
resultRowsPerRange = cfs.estimateKeys();
}
else
{
resultRowsPerRange = calculateResultRowsUsingEstimatedKeys(cfs);
}
// adjust resultRowsPerRange by the number of tokens this node has and the replication factor for this ks
return (resultRowsPerRange / DatabaseDescriptor.getNumTokens()) / keyspace.getReplicationStrategy().getReplicationFactor();
#endif
}
#if 0
private static float calculateResultRowsUsingEstimatedKeys(ColumnFamilyStore cfs)
{
if (cfs.metadata.comparator.isDense())
@@ -2034,229 +2280,6 @@ std::vector<gms::inet_address> storage_proxy::get_live_sorted_endpoints(keyspace
}
}
public static List<Row> getRangeSlice(AbstractRangeCommand command, ConsistencyLevel consistency_level)
throws UnavailableException, ReadTimeoutException
{
Tracing.trace("Computing ranges to query");
long startTime = System.nanoTime();
Keyspace keyspace = Keyspace.open(command.keyspace);
List<Row> rows;
// now scan until we have enough results
try
{
int cql3RowCount = 0;
rows = new ArrayList<>();
// when dealing with LocalStrategy keyspaces, we can skip the range splitting and merging (which can be
// expensive in clusters with vnodes)
List<? extends AbstractBounds<RowPosition>> ranges;
if (keyspace.getReplicationStrategy() instanceof LocalStrategy)
ranges = command.keyRange.unwrap();
else
ranges = getRestrictedRanges(command.keyRange);
// our estimate of how many result rows there will be per-range
float resultRowsPerRange = estimateResultRowsPerRange(command, keyspace);
// underestimate how many rows we will get per-range in order to increase the likelihood that we'll
// fetch enough rows in the first round
resultRowsPerRange -= resultRowsPerRange * CONCURRENT_SUBREQUESTS_MARGIN;
int concurrencyFactor = resultRowsPerRange == 0.0
? 1
: Math.max(1, Math.min(ranges.size(), (int) Math.ceil(command.limit() / resultRowsPerRange)));
logger.debug("Estimated result rows per range: {}; requested rows: {}, ranges.size(): {}; concurrent range requests: {}",
resultRowsPerRange, command.limit(), ranges.size(), concurrencyFactor);
Tracing.trace("Submitting range requests on {} ranges with a concurrency of {} ({} rows per range expected)", new Object[]{ ranges.size(), concurrencyFactor, resultRowsPerRange});
boolean haveSufficientRows = false;
int i = 0;
AbstractBounds<RowPosition> nextRange = null;
List<InetAddress> nextEndpoints = null;
List<InetAddress> nextFilteredEndpoints = null;
while (i < ranges.size())
{
List<Pair<AbstractRangeCommand, ReadCallback<RangeSliceReply, Iterable<Row>>>> scanHandlers = new ArrayList<>(concurrencyFactor);
int concurrentFetchStartingIndex = i;
int concurrentRequests = 0;
while ((i - concurrentFetchStartingIndex) < concurrencyFactor)
{
AbstractBounds<RowPosition> range = nextRange == null
? ranges.get(i)
: nextRange;
List<InetAddress> liveEndpoints = nextEndpoints == null
? getLiveSortedEndpoints(keyspace, range.right)
: nextEndpoints;
List<InetAddress> filteredEndpoints = nextFilteredEndpoints == null
? consistency_level.filterForQuery(keyspace, liveEndpoints)
: nextFilteredEndpoints;
++i;
++concurrentRequests;
// getRestrictedRange has broken the queried range into per-[vnode] token ranges, but this doesn't take
// the replication factor into account. If the intersection of live endpoints for 2 consecutive ranges
// still meets the CL requirements, then we can merge both ranges into the same RangeSliceCommand.
while (i < ranges.size())
{
nextRange = ranges.get(i);
nextEndpoints = getLiveSortedEndpoints(keyspace, nextRange.right);
nextFilteredEndpoints = consistency_level.filterForQuery(keyspace, nextEndpoints);
// If the current range right is the min token, we should stop merging because CFS.getRangeSlice
// don't know how to deal with a wrapping range.
// Note: it would be slightly more efficient to have CFS.getRangeSlice on the destination nodes unwraps
// the range if necessary and deal with it. However, we can't start sending wrapped range without breaking
// wire compatibility, so It's likely easier not to bother;
if (range.right.isMinimum())
break;
List<InetAddress> merged = intersection(liveEndpoints, nextEndpoints);
// Check if there is enough endpoint for the merge to be possible.
if (!consistency_level.isSufficientLiveNodes(keyspace, merged))
break;
List<InetAddress> filteredMerged = consistency_level.filterForQuery(keyspace, merged);
// Estimate whether merging will be a win or not
if (!DatabaseDescriptor.getEndpointSnitch().isWorthMergingForRangeQuery(filteredMerged, filteredEndpoints, nextFilteredEndpoints))
break;
// If we get there, merge this range and the next one
range = range.withNewRight(nextRange.right);
liveEndpoints = merged;
filteredEndpoints = filteredMerged;
++i;
}
AbstractRangeCommand nodeCmd = command.forSubRange(range);
// collect replies and resolve according to consistency level
RangeSliceResponseResolver resolver = new RangeSliceResponseResolver(nodeCmd.keyspace, command.timestamp);
List<InetAddress> minimalEndpoints = filteredEndpoints.subList(0, Math.min(filteredEndpoints.size(), consistency_level.blockFor(keyspace)));
ReadCallback<RangeSliceReply, Iterable<Row>> handler = new ReadCallback<>(resolver, consistency_level, nodeCmd, minimalEndpoints);
handler.assureSufficientLiveNodes();
resolver.setSources(filteredEndpoints);
if (filteredEndpoints.size() == 1
&& filteredEndpoints.get(0).equals(FBUtilities.getBroadcastAddress())
&& OPTIMIZE_LOCAL_REQUESTS)
{
StageManager.getStage(Stage.READ).execute(new LocalRangeSliceRunnable(nodeCmd, handler), Tracing.instance.get());
}
else
{
MessageOut<? extends AbstractRangeCommand> message = nodeCmd.createMessage();
for (InetAddress endpoint : filteredEndpoints)
{
Tracing.trace("Enqueuing request to {}", endpoint);
MessagingService.instance().sendRR(message, endpoint, handler);
}
}
scanHandlers.add(Pair.create(nodeCmd, handler));
}
Tracing.trace("Submitted {} concurrent range requests covering {} ranges", concurrentRequests, i - concurrentFetchStartingIndex);
List<AsyncOneResponse> repairResponses = new ArrayList<>();
for (Pair<AbstractRangeCommand, ReadCallback<RangeSliceReply, Iterable<Row>>> cmdPairHandler : scanHandlers)
{
AbstractRangeCommand nodeCmd = cmdPairHandler.left;
ReadCallback<RangeSliceReply, Iterable<Row>> handler = cmdPairHandler.right;
RangeSliceResponseResolver resolver = (RangeSliceResponseResolver)handler.resolver;
try
{
for (Row row : handler.get())
{
rows.add(row);
if (nodeCmd.countCQL3Rows())
cql3RowCount += row.getLiveCount(command.predicate, command.timestamp);
}
repairResponses.addAll(resolver.repairResults);
}
catch (ReadTimeoutException ex)
{
// we timed out waiting for responses
int blockFor = consistency_level.blockFor(keyspace);
int responseCount = resolver.responses.size();
String gotData = responseCount > 0
? resolver.isDataPresent() ? " (including data)" : " (only digests)"
: "";
if (Tracing.isTracing())
{
Tracing.trace("Timed out; received {} of {} responses{} for range {} of {}",
new Object[]{ responseCount, blockFor, gotData, i, ranges.size() });
}
else if (logger.isDebugEnabled())
{
logger.debug("Range slice timeout; received {} of {} responses{} for range {} of {}",
responseCount, blockFor, gotData, i, ranges.size());
}
throw ex;
}
catch (DigestMismatchException e)
{
throw new AssertionError(e); // no digests in range slices yet
}
// if we're done, great, otherwise, move to the next range
int count = nodeCmd.countCQL3Rows() ? cql3RowCount : rows.size();
if (count >= nodeCmd.limit())
{
haveSufficientRows = true;
break;
}
}
try
{
FBUtilities.waitOnFutures(repairResponses, DatabaseDescriptor.getWriteRpcTimeout());
}
catch (TimeoutException ex)
{
// We got all responses, but timed out while repairing
int blockFor = consistency_level.blockFor(keyspace);
if (Tracing.isTracing())
Tracing.trace("Timed out while read-repairing after receiving all {} data and digest responses", blockFor);
else
logger.debug("Range slice timeout while read-repairing after receiving all {} data and digest responses", blockFor);
throw new ReadTimeoutException(consistency_level, blockFor-1, blockFor, true);
}
if (haveSufficientRows)
return trim(command, rows);
// we didn't get enough rows in our concurrent fetch; recalculate our concurrency factor
// based on the results we've seen so far (as long as we still have ranges left to query)
if (i < ranges.size())
{
float fetchedRows = command.countCQL3Rows() ? cql3RowCount : rows.size();
float remainingRows = command.limit() - fetchedRows;
float actualRowsPerRange;
if (fetchedRows == 0.0)
{
// we haven't actually gotten any results, so query all remaining ranges at once
actualRowsPerRange = 0.0f;
concurrencyFactor = ranges.size() - i;
}
else
{
actualRowsPerRange = i / fetchedRows;
concurrencyFactor = Math.max(1, Math.min(ranges.size() - i, Math.round(remainingRows / actualRowsPerRange)));
}
logger.debug("Didn't get enough response rows; actual rows per range: {}; remaining rows: {}, new concurrent requests: {}",
actualRowsPerRange, (int) remainingRows, concurrencyFactor);
}
}
}
finally
{
long latency = System.nanoTime() - startTime;
rangeMetrics.addNano(latency);
Keyspace.open(command.keyspace).getColumnFamilyStore(command.columnFamily).metric.coordinatorScanLatency.update(latency, TimeUnit.NANOSECONDS);
}
return trim(command, rows);
}
private static List<Row> trim(AbstractRangeCommand command, List<Row> rows)
{
// When maxIsColumns, we let the caller trim the result.
@@ -2344,54 +2367,53 @@ std::vector<gms::inet_address> storage_proxy::get_live_sorted_endpoints(keyspace
return results;
}
#endif
/**
* Compute all ranges we're going to query, in sorted order. Nodes can be replica destinations for many ranges,
* so we need to restrict each scan to the specific range we want, or else we'd get duplicate results.
*/
static <T extends RingPosition<T>> List<AbstractBounds<T>> getRestrictedRanges(final AbstractBounds<T> queryRange)
{
// special case for bounds containing exactly 1 (non-minimum) token
if (queryRange instanceof Bounds && queryRange.left.equals(queryRange.right) && !queryRange.left.isMinimum())
{
return Collections.singletonList(queryRange);
}
TokenMetadata tokenMetadata = StorageService.instance.getTokenMetadata();
List<AbstractBounds<T>> ranges = new ArrayList<AbstractBounds<T>>();
// divide the queryRange into pieces delimited by the ring and minimum tokens
Iterator<Token> ringIter = TokenMetadata.ringIterator(tokenMetadata.sortedTokens(), queryRange.left.getToken(), true);
AbstractBounds<T> remainder = queryRange;
while (ringIter.hasNext())
{
/*
* remainder can be a range/bounds of token _or_ keys and we want to split it with a token:
* - if remainder is tokens, then we'll just split using the provided token.
* - if remainder is keys, we want to split using token.upperBoundKey. For instance, if remainder
* is [DK(10, 'foo'), DK(20, 'bar')], and we have 3 nodes with tokens 0, 15, 30. We want to
* split remainder to A=[DK(10, 'foo'), 15] and B=(15, DK(20, 'bar')]. But since we can't mix
* tokens and keys at the same time in a range, we uses 15.upperBoundKey() to have A include all
* keys having 15 as token and B include none of those (since that is what our node owns).
* asSplitValue() abstracts that choice.
*/
Token upperBoundToken = ringIter.next();
T upperBound = (T)upperBoundToken.upperBound(queryRange.left.getClass());
if (!remainder.left.equals(upperBound) && !remainder.contains(upperBound))
// no more splits
break;
Pair<AbstractBounds<T>,AbstractBounds<T>> splits = remainder.split(upperBound);
if (splits == null)
continue;
ranges.add(splits.left);
remainder = splits.right;
}
ranges.add(remainder);
return ranges;
/**
* Compute all ranges we're going to query, in sorted order. Nodes can be replica destinations for many ranges,
* so we need to restrict each scan to the specific range we want, or else we'd get duplicate results.
*/
std::vector<query::partition_range>
storage_proxy::get_restricted_ranges(keyspace& ks, const schema& s, query::partition_range range) {
// special case for bounds containing exactly 1 (non-minimum) token
if (range.is_singular()) {
return std::vector<query::partition_range>({std::move(range)});
}
locator::token_metadata& tm = get_local_storage_service().get_token_metadata();
std::vector<query::partition_range> ranges;
// divide the queryRange into pieces delimited by the ring and minimum tokens
auto ring_iter = tm.ring_range(range.start_value().token(), true);
query::partition_range remainder = range;
for(const dht::token& upper_bound_token: ring_iter)
{
/*
* remainder can be a range/bounds of token _or_ keys and we want to split it with a token:
* - if remainder is tokens, then we'll just split using the provided token.
* - if remainder is keys, we want to split using token.upperBoundKey. For instance, if remainder
* is [DK(10, 'foo'), DK(20, 'bar')], and we have 3 nodes with tokens 0, 15, 30. We want to
* split remainder to A=[DK(10, 'foo'), 15] and B=(15, DK(20, 'bar')]. But since we can't mix
* tokens and keys at the same time in a range, we uses 15.upperBoundKey() to have A include all
* keys having 15 as token and B include none of those (since that is what our node owns).
* asSplitValue() abstracts that choice.
*/
dht::ring_position split_point(upper_bound_token);
if (!remainder.contains(split_point, dht::ring_position_comparator(s))) {
break; // no more splits
}
std::pair<query::partition_range, query::partition_range> splits = remainder.split(split_point, dht::ring_position_comparator(s));
ranges.emplace_back(std::move(splits.first));
remainder = std::move(splits.second);
}
ranges.emplace_back(std::move(remainder));
return ranges;
}
#if 0
public long getReadOperations()
{
return readMetrics.latency.count();

View File

@@ -61,6 +61,7 @@ private:
size_t _total_hints_in_progress = 0;
std::unordered_map<gms::inet_address, size_t> _hints_in_progress;
stats _stats;
static constexpr float CONCURRENT_SUBREQUESTS_MARGIN = 0.10;
private:
void init_messaging_service();
future<foreign_ptr<lw_shared_ptr<query::result>>> query_singular(lw_shared_ptr<query::read_command> cmd, std::vector<query::partition_range>&& partition_ranges, db::consistency_level cl);
@@ -81,6 +82,11 @@ private:
::shared_ptr<abstract_read_executor> get_read_executor(lw_shared_ptr<query::read_command> cmd, query::partition_range pr, db::consistency_level cl);
future<foreign_ptr<lw_shared_ptr<query::result>>> query_singular_local(lw_shared_ptr<query::read_command> cmd, const query::partition_range& pr);
future<query::result_digest> query_singular_local_digest(lw_shared_ptr<query::read_command> cmd, const query::partition_range& pr);
future<foreign_ptr<lw_shared_ptr<query::result>>> query_partition_key_range(lw_shared_ptr<query::read_command> cmd, query::partition_range&& range, db::consistency_level cl);
std::vector<query::partition_range> get_restricted_ranges(keyspace& ks, const schema& s, query::partition_range range);
float estimate_result_rows_per_range(lw_shared_ptr<query::read_command> cmd, keyspace& ks);
static std::vector<gms::inet_address> intersection(const std::vector<gms::inet_address>& l1, const std::vector<gms::inet_address>& l2);
future<std::vector<foreign_ptr<lw_shared_ptr<query::result>>>> query_partition_key_range_concurrent(std::vector<foreign_ptr<lw_shared_ptr<query::result>>>&& results, lw_shared_ptr<query::read_command> cmd, db::consistency_level cl, std::vector<query::partition_range>::iterator&& i, std::vector<query::partition_range>&& ranges, int concurrency_factor);
public:
storage_proxy(distributed<database>& db);