diff --git a/dht/i_partitioner.cc b/dht/i_partitioner.cc index 3a03806a16..7610ecd39e 100644 --- a/dht/i_partitioner.cc +++ b/dht/i_partitioner.cc @@ -145,12 +145,18 @@ bool operator<(const token& t1, const token& t2) } std::ostream& operator<<(std::ostream& out, const token& t) { - auto flags = out.flags(); - for (auto c : t._data) { - unsigned char x = c; - out << std::hex << std::setw(2) << std::setfill('0') << +x << " "; + if (t._kind == token::kind::after_all_keys) { + out << "maximum token"; + } else if (t._kind == token::kind::before_all_keys) { + out << "minimum token"; + } else { + auto flags = out.flags(); + for (auto c : t._data) { + unsigned char x = c; + out << std::hex << std::setw(2) << std::setfill('0') << +x << " "; + } + out.flags(flags); } - out.flags(flags); return out; } @@ -286,4 +292,14 @@ unsigned shard_of(const token& t) { return v % smp::count; } +int ring_position_comparator::operator()(const ring_position& lh, const ring_position& rh) const { + if (lh.less_compare(s, rh)) { + return -1; + } else if (lh.equal(s, rh)) { + return 0; + } else { + return 1; + } +} + } diff --git a/dht/i_partitioner.hh b/dht/i_partitioner.hh index 9eaf9d421b..5b1edc181c 100644 --- a/dht/i_partitioner.hh +++ b/dht/i_partitioner.hh @@ -269,6 +269,26 @@ public: return { _token, *_key }; } + bool equal(const schema& s, const ring_position& lhr) const { + if (_token != lhr._token) { + return false; + } else if (!_key || !lhr._key){ + return true; // empty key "matches" any other key + } else { + return _key->legacy_equal(s, *lhr._key); + }; + } + + bool less_compare(const schema& s, const ring_position& lhr) const { + if (_token != lhr._token) { + return _token < lhr._token; + } else if (!_key || !lhr._key) { + return false; + } else { + return _key->legacy_tri_compare(s, *lhr._key) < 0; + } + } + size_t serialized_size() const; void serialize(bytes::iterator& out) const; static ring_position deserialize(bytes_view& in); @@ -276,6 +296,11 @@ public: friend std::ostream& operator<<(std::ostream&, const ring_position&); }; +struct ring_position_comparator { + const schema& s; + ring_position_comparator(const schema& s_) : s(s_) {} + int operator()(const ring_position& lh, const ring_position& rh) const; +}; std::ostream& operator<<(std::ostream& out, const token& t); diff --git a/locator/token_metadata.hh b/locator/token_metadata.hh index 86b2431398..2a6269e5e6 100644 --- a/locator/token_metadata.hh +++ b/locator/token_metadata.hh @@ -122,39 +122,50 @@ private: public std::iterator { private: tokens_iterator(std::vector::const_iterator it, size_t pos) - : _cur_it(it), _ring_pos(pos) {} + : _cur_it(it), _ring_pos(pos), _insert_min(false) {} public: - tokens_iterator(const token& start, token_metadata* token_metadata) + tokens_iterator(const token& start, token_metadata* token_metadata, bool include_min = false) : _token_metadata(token_metadata) { - _cur_it = _token_metadata->sorted_tokens().begin() + - _token_metadata->first_token_index(start); + _cur_it = _token_metadata->sorted_tokens().begin() + _token_metadata->first_token_index(start); + _insert_min = include_min && *_token_metadata->sorted_tokens().begin() != dht::minimum_token(); + if (_token_metadata->sorted_tokens().empty()) { + _min = true; + } } bool operator==(const tokens_iterator& it) const { - return _cur_it == it._cur_it; + return _min == it._min && _cur_it == it._cur_it; } bool operator!=(const tokens_iterator& it) const { - return _cur_it != it._cur_it; + return _min != it._min || _cur_it != it._cur_it; } const token& operator*() { - return *_cur_it; + if (_min) { + return _min_token; + } else { + return *_cur_it; + } } tokens_iterator& operator++() { - if (_ring_pos >= _token_metadata->sorted_tokens().size()) { - _cur_it = _token_metadata->sorted_tokens().end(); - } else { - ++_cur_it; - ++_ring_pos; + if (!_min) { + if (_ring_pos >= _token_metadata->sorted_tokens().size()) { + _cur_it = _token_metadata->sorted_tokens().end(); + } else { + ++_cur_it; + ++_ring_pos; - if (_cur_it == _token_metadata->sorted_tokens().end()) { - _cur_it = _token_metadata->sorted_tokens().begin(); + if (_cur_it == _token_metadata->sorted_tokens().end()) { + _cur_it = _token_metadata->sorted_tokens().begin(); + _min = _insert_min; + } } + } else { + _min = false; } - return *this; } @@ -165,6 +176,9 @@ private: // "start" // size_t _ring_pos = 0; + bool _insert_min; + bool _min = false; + const token _min_token = dht::minimum_token(); token_metadata* _token_metadata = nullptr; friend class token_metadata; @@ -208,8 +222,8 @@ public: * * @return The requested range (see the description above) */ - auto ring_range(const token& start) { - auto begin = tokens_iterator(start, this); + auto ring_range(const token& start, bool include_min = false) { + auto begin = tokens_iterator(start, this, include_min); auto end = tokens_end(); return boost::make_iterator_range(begin, end); } diff --git a/query-request.hh b/query-request.hh index 55af1a2c20..c2ae3fb832 100644 --- a/query-request.hh +++ b/query-request.hh @@ -41,6 +41,37 @@ public: , _singular(true) { } range() : range({}, {}) {} +private: + // the point is before the range (works only for non wrapped ranges) + template + bool before(const T& point, Comparator&& cmp) const { + if (!_start) { + return false; //open start, no points before + } + auto r = cmp(point, start_value()); + if (r < 0) { + return true; + } + if (!_start->is_inclusive() && r == 0) { + return true; + } + return false; + } + // the point is after the range (works only for non wrapped ranges) + template + bool after(const T& point, Comparator&& cmp) const { + if (!_end) { + return false; //open end, no points after + } + auto r = cmp(end_value(), point); + if (r < 0) { + return true; + } + if (!_end->is_inclusive() && r == 0) { + return true; + } + return false; + } public: static range make(bound start, bound end) { return range({std::move(start)}, {std::move(end)}); @@ -74,15 +105,40 @@ public: const T& end_value() const { return _end->value(); } - const optional& start() const { return _start; } - const optional& end() const { return _end; } - + // end is smaller than start + template + bool is_wrap_around(Comparator&& cmp) const { + if (_end && _start) { + return cmp(end_value(), start_value()) < 0; + } else { + return false; // open ended range never wraps around + } + } + // the point is inside the range + template + bool contains(const T& point, Comparator&& cmp) const { + if (is_wrap_around(cmp)) { + // wrapped range contains point if reverse does not contain it + return !range::make({end_value(), !_end->is_inclusive()}, {start_value(), !_start->is_inclusive()}).contains(point, cmp); + } else { + return !before(point, cmp) && !after(point, cmp); + } + } + // split range in two around a split_point. split_point has to be inside the range + // split_point will belong to first range + template + std::pair, range> split(const T& split_point, Comparator&& cmp) const { + assert(contains(split_point, std::forward(cmp))); + range left(_start, bound(split_point)); + range right(bound(split_point, false), _end); + return std::make_pair(std::move(left), std::move(right)); + } // Transforms this range into a new range of a different value type // Supplied transformer should transform value of type T (the old type) into value of type U (the new type). template diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index 6895f3a57e..472d32b50e 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -1076,7 +1076,7 @@ future<> storage_proxy::send_to_live_endpoints(storage_proxy::response_id_type r auto all = boost::range::join(local, dc_groups); // OK, now send and/or apply locally - return parallel_for_each(all.begin(), all.end(), [response_id, &m, this] (typename decltype(dc_groups)::value_type& dc_targets) mutable { + return parallel_for_each(all.begin(), all.end(), [response_id, &m, this] (typename decltype(dc_groups)::value_type& dc_targets) { auto my_address = utils::fb_utilities::get_broadcast_address(); auto& forward = dc_targets.second; @@ -1089,9 +1089,10 @@ future<> storage_proxy::send_to_live_endpoints(storage_proxy::response_id_type r got_response(response_id, my_address); }); } else { + auto response_id_ = response_id; // make local copy since capture is reused auto& ms = net::get_local_messaging_service(); return ms.send_message_oneway(net::messaging_verb::MUTATION, net::messaging_service::shard_id{coordinator, 0}, m, std::move(forward), std::move(my_address), - engine().cpu_id(), std::move(response_id)); + engine().cpu_id(), std::move(response_id_)); } }).finally([mptr] { // make mutation alive until it is sent or processed locally, otherwise it @@ -1356,53 +1357,124 @@ public: digest_mismatch_exception() : std::runtime_error("Digest mismatch") {} }; -class abstract_read_executor : public enable_shared_from_this { - using targets_iterator = std::vector::iterator; +class read_timeout_exception : public std::runtime_error { +public: + read_timeout_exception() : std::runtime_error("Read operation timed out") {} +}; - storage_proxy& _proxy; - keyspace& _ks; - lw_shared_ptr _cmd; - query::partition_range _partition_range; +class abstract_read_resolver { +protected: + size_t _targets_count; + promise<> _done_promise; // all target responded + +public: + abstract_read_resolver(size_t target_count) : _targets_count(target_count) {} + virtual ~abstract_read_resolver() {}; + future<> done() { + return _done_promise.get_future(); + } +}; + +class digest_read_resolver : public abstract_read_resolver { db::consistency_level _cl; - std::vector _targets; - promise>> _result_promise; - bool _done = false; // becomes true when promise above is fulfilled (operation may still continue) + size_t _block_for; + size_t _cl_responses = 0; + promise<> _cl_promise; // cl is reached std::vector>> _data_results; std::vector _digest_results; - size_t _responses = 0; - bool digests_match() { + + bool digests_match() const { assert(_digest_results.size()); auto& first = *_digest_results.begin(); return std::find_if(_digest_results.begin() + 1, _digest_results.end(), [&first] (query::result_digest digest) { return digest != first; }) == _digest_results.end(); } -protected: +public: + digest_read_resolver(db::consistency_level cl, size_t block_for, size_t targets_count) : abstract_read_resolver(targets_count), _cl(cl), _block_for(block_for) {} + void add_data(gms::inet_address from, foreign_ptr> result) { + _digest_results.emplace_back(result->digest()); + _data_results.emplace_back(std::move(result)); + got_response(from); + } + void add_digest(gms::inet_address from, query::result_digest digest) { + _digest_results.emplace_back(std::move(digest)); + got_response(from); + } + foreign_ptr> resolve() { + assert(_data_results.size()); + if (!digests_match()) { + throw digest_mismatch_exception(); + } + return std::move(*_data_results.begin()); + } bool waiting_for(gms::inet_address ep) { return db::is_datacenter_local(_cl) ? is_me(ep) || db::is_local(ep) : true; } void got_response(gms::inet_address ep) { - if (!_done) { + if (_cl_responses < _block_for) { if (waiting_for(ep)) { - _responses++; + _cl_responses++; } - if (_responses >= db::block_for(_ks, _cl) && _data_results.size()) { - if (!digests_match()) { - throw digest_mismatch_exception(); - } else { - _result_promise.set_value(std::move(*_data_results.begin())); - } - _done = true; - } - } else if (_digest_results.size() == _targets.size()) { - if (!digests_match()) { - throw digest_mismatch_exception(); + if (_cl_responses == _block_for && _data_results.size()) { + _cl_promise.set_value(); } } + if (_digest_results.size() == _targets_count) { + _done_promise.set_value(); + } } + future<> has_cl() { + return _cl_promise.get_future(); + } +}; + +class data_read_resolver : public abstract_read_resolver { + std::unordered_map>> _data_results; + public: - abstract_read_executor(storage_proxy& proxy, keyspace& ks, lw_shared_ptr cmd, query::partition_range pr, db::consistency_level cl, std::vector targets) : - _proxy(proxy), _ks(ks), _cmd(std::move(cmd)), _partition_range(std::move(pr)), _cl(cl), _targets(std::move(targets)) {} + data_read_resolver(size_t targets_count) : abstract_read_resolver(targets_count) {} + void add_mutate_data(gms::inet_address from, foreign_ptr> result) { + _data_results[from] = std::move(result); + if (_data_results.size() == _targets_count) { + _done_promise.set_value(); + } + } + foreign_ptr> resolve() { + assert(_data_results.size()); + return std::move((*_data_results.begin()).second); + } +}; + +class abstract_read_executor : public enable_shared_from_this { +protected: + using targets_iterator = std::vector::iterator; + using digest_resolver_ptr = ::shared_ptr; + using data_resolver_ptr = ::shared_ptr; + + storage_proxy& _proxy; + lw_shared_ptr _cmd; + query::partition_range _partition_range; + db::consistency_level _cl; + size_t _block_for; + std::vector _targets; + promise>> _result_promise; + +public: + abstract_read_executor(storage_proxy& proxy, lw_shared_ptr cmd, query::partition_range pr, db::consistency_level cl, size_t block_for, + std::vector targets) : + _proxy(proxy), _cmd(std::move(cmd)), _partition_range(std::move(pr)), _cl(cl), _block_for(block_for), _targets(std::move(targets)) {} virtual ~abstract_read_executor() {}; +protected: + future>> make_mutation_data_request(gms::inet_address ep) { + if (is_me(ep)) { + return _proxy.query_mutations_locally(_cmd, _partition_range); + } else { + auto& ms = net::get_local_messaging_service(); + return ms.send_message(net::messaging_verb::READ_MUTATION_DATA, net::messaging_service::shard_id{ep, 0}, *_cmd, _partition_range).then([this](reconcilable_result&& result) { + return make_foreign(::make_lw_shared(std::move(result))); + }); + } + } future>> make_data_request(gms::inet_address ep) { if (is_me(ep)) { return _proxy.query_singular_local(_cmd, _partition_range); @@ -1421,54 +1493,114 @@ public: return ms.send_message(net::messaging_verb::READ_DIGEST, net::messaging_service::shard_id{ep, 0}, *_cmd, _partition_range); } } - future<> make_data_requests(targets_iterator begin, targets_iterator end) { - return parallel_for_each(begin, end, [this] (gms::inet_address ep) { - return make_data_request(ep).then([exec = shared_from_this(), ep] (foreign_ptr> result) { - exec->_digest_results.emplace_back(std::move(result->digest())); - exec->_data_results.emplace_back(std::move(result)); - exec->got_response(ep); + future<> make_mutation_data_requests(data_resolver_ptr resolver, targets_iterator begin, targets_iterator end) { + return parallel_for_each(begin, end, [this, resolver = std::move(resolver)] (gms::inet_address ep) { + return make_mutation_data_request(ep).then([resolver, ep] (foreign_ptr> result) { + resolver->add_mutate_data(ep, std::move(result)); }); }); } - future<> make_digest_requests(targets_iterator begin, targets_iterator end) { - return parallel_for_each(begin, end, [this] (gms::inet_address ep) { - return make_digest_request(ep).then([exec = shared_from_this(), ep] (query::result_digest&& digest) { - exec->_digest_results.emplace_back(std::move(digest)); - exec->got_response(ep); + future<> make_data_requests(digest_resolver_ptr resolver, targets_iterator begin, targets_iterator end) { + return parallel_for_each(begin, end, [this, resolver = std::move(resolver)] (gms::inet_address ep) { + return make_data_request(ep).then([resolver, ep] (foreign_ptr> result) { + resolver->add_data(ep, std::move(result)); }); }); } - virtual future>> execute() { - when_all(make_data_requests(_targets.begin(), _targets.begin() + 1), - _targets.size() > 1 ? make_digest_requests(_targets.begin() + 1, _targets.end()) : make_ready_future()).discard_result().then_wrapped([this](future<>&& f) { + future<> make_digest_requests(digest_resolver_ptr resolver, targets_iterator begin, targets_iterator end) { + return parallel_for_each(begin, end, [this, resolver = std::move(resolver)] (gms::inet_address ep) { + return make_digest_request(ep).then([resolver, ep] (query::result_digest&& digest) { + resolver->add_digest(ep, std::move(digest)); + }); + }); + } + virtual future<> make_requests(digest_resolver_ptr resolver) { + return when_all(make_data_requests(resolver, _targets.begin(), _targets.begin() + 1), + _targets.size() > 1 ? make_digest_requests(resolver, _targets.begin() + 1, _targets.end()) : make_ready_future()).discard_result(); + } + void reconciliate() { + data_resolver_ptr data_resolver = ::make_shared(_targets.size()); + auto exec = shared_from_this(); + + make_mutation_data_requests(data_resolver, _targets.begin(), _targets.end()).finally([exec]{}); + + data_resolver->done().then_wrapped([exec, data_resolver] (future<>&& f){ try { f.get(); - } catch(...) { - if (!_done) { - _result_promise.set_exception(std::current_exception()); - } + auto rr = data_resolver->resolve(); // reconciliation happens here + schema_ptr s = exec->_proxy._db.local().find_schema(exec->_cmd->cf_id); + auto result = ::make_foreign(::make_lw_shared(to_data_query_result(*rr, std::move(s), exec->_cmd->slice))); + exec->_result_promise.set_value(std::move(result)); + } catch(read_timeout_exception& ex) { + exec->_result_promise.set_exception(std::current_exception()); } }); + } +public: + virtual future>> execute() { + digest_resolver_ptr digest_resolver = ::make_shared(_cl, _block_for, _targets.size()); + auto exec = shared_from_this(); + + make_requests(digest_resolver).finally([exec]() { + // hold on to executor until all queries are complete + }); + + digest_resolver->has_cl().then_wrapped([exec, digest_resolver] (future<>&& f) { + try { + f.get(); + exec->_result_promise.set_value(digest_resolver->resolve()); + auto done = digest_resolver->done(); + if (exec->_block_for < exec->_targets.size()) { // if there are more targets then needed for cl, check digest in background + done.then_wrapped([exec, digest_resolver] (future<>&& f){ + try { + f.get(); + digest_resolver->resolve(); + } catch(digest_mismatch_exception& ex) { + // FIXME: do read repair here + } catch(...) { + // ignore all exception besides digest mismatch during background check + } + }); + } else { + done.discard_result(); // no need for background check, discard done future explicitly + } + } catch (digest_mismatch_exception& ex) { + exec->reconciliate(); + } catch(read_timeout_exception& ex) { + exec->_result_promise.set_exception(std::current_exception()); + } + }); + return _result_promise.get_future(); } }; class never_speculating_read_executor : public abstract_read_executor { public: - never_speculating_read_executor(storage_proxy& proxy, keyspace& ks, lw_shared_ptr cmd, query::partition_range pr, db::consistency_level cl, std::vector targets) : - abstract_read_executor(proxy, ks, std::move(cmd), std::move(pr), cl, std::move(targets)) {} + never_speculating_read_executor(storage_proxy& proxy, lw_shared_ptr cmd, query::partition_range pr, db::consistency_level cl, size_t block_for, std::vector targets) : + abstract_read_executor(proxy, std::move(cmd), std::move(pr), cl, block_for, std::move(targets)) {} }; class always_speculating_read_executor : public abstract_read_executor { public: - always_speculating_read_executor(storage_proxy& proxy, keyspace& ks, lw_shared_ptr cmd, query::partition_range pr, db::consistency_level cl, std::vector targets) : - abstract_read_executor(proxy, ks, std::move(cmd), std::move(pr), cl, std::move(targets)) {} + always_speculating_read_executor(storage_proxy& proxy, lw_shared_ptr cmd, query::partition_range pr, db::consistency_level cl, size_t block_for, std::vector targets) : + abstract_read_executor(proxy,std::move(cmd), std::move(pr), cl, block_for, std::move(targets)) {} }; class speculating_read_executor : public abstract_read_executor { public: - speculating_read_executor(storage_proxy& proxy, keyspace& ks, lw_shared_ptr cmd, query::partition_range pr, db::consistency_level cl, std::vector targets) : - abstract_read_executor(proxy, ks, std::move(cmd), std::move(pr), cl, std::move(targets)) {} + speculating_read_executor(storage_proxy& proxy, lw_shared_ptr cmd, query::partition_range pr, db::consistency_level cl, size_t block_for, std::vector targets) : + abstract_read_executor(proxy, std::move(cmd), std::move(pr), cl, block_for, std::move(targets)) {} +}; + +class range_slice_read_executor : public abstract_read_executor { +public: + range_slice_read_executor(storage_proxy& proxy, lw_shared_ptr cmd, query::partition_range pr, db::consistency_level cl, std::vector targets) : + abstract_read_executor(proxy, std::move(cmd), std::move(pr), cl, targets.size(), std::move(targets)) {} + virtual future>> execute() override { + reconciliate(); + return _result_promise.get_future(); + } }; enum class speculative_retry_type { @@ -1497,16 +1629,17 @@ enum class speculative_retry_type { #endif speculative_retry_type retry_type = speculative_retry_type::NONE;//cfs.metadata.getSpeculativeRetry().type; + size_t block_for = db::block_for(ks, cl); // Speculative retry is disabled *OR* there are simply no extra replicas to speculate. - if (retry_type == speculative_retry_type::NONE || block_for(ks, cl) == all_replicas.size()) { - return ::make_shared(*this, ks, cmd, std::move(pr), cl, std::move(target_replicas)); + if (retry_type == speculative_retry_type::NONE || db::block_for(ks, cl) == all_replicas.size()) { + return ::make_shared(*this, cmd, std::move(pr), cl, block_for, std::move(target_replicas)); } if (target_replicas.size() == all_replicas.size()) { // CL.ALL, RRD.GLOBAL or RRD.DC_LOCAL and a single-DC. // We are going to contact every node anyway, so ask for 2 full data requests instead of 1, for redundancy // (same amount of requests in total, but we turn 1 digest request into a full blown data request). - return ::make_shared(/*cfs, */*this, ks, cmd, std::move(pr), cl, std::move(target_replicas)); + return ::make_shared(/*cfs, */*this, cmd, std::move(pr), cl, block_for, std::move(target_replicas)); } // RRD.NONE or RRD.DC_LOCAL w/ multiple DCs. @@ -1522,9 +1655,9 @@ enum class speculative_retry_type { target_replicas.push_back(extra_replica); if (retry_type == speculative_retry_type::ALWAYS) { - return ::make_shared(/*cfs,*/ *this, ks, cmd, std::move(pr), cl, std::move(target_replicas)); + return ::make_shared(/*cfs,*/ *this, cmd, std::move(pr), cl, block_for, std::move(target_replicas)); } else {// PERCENTILE or CUSTOM. - return ::make_shared(/*cfs,*/ *this, ks, cmd, std::move(pr), cl, std::move(target_replicas)); + return ::make_shared(/*cfs,*/ *this, cmd, std::move(pr), cl, block_for, std::move(target_replicas)); } } @@ -1569,6 +1702,132 @@ storage_proxy::query_singular(lw_shared_ptr cmd, std::vecto }); } +future>>> +storage_proxy::query_partition_key_range_concurrent(std::vector>>&& results, + lw_shared_ptr cmd, db::consistency_level cl, std::vector::iterator&& i, + std::vector&& ranges, int concurrency_factor) { + schema_ptr schema = _db.local().find_schema(cmd->cf_id); + keyspace& ks = _db.local().find_keyspace(schema->ks_name()); + std::vector<::shared_ptr> exec; + auto concurrent_fetch_starting_index = i; + + while (i != ranges.end() && std::distance(i, concurrent_fetch_starting_index) < concurrency_factor) { + query::partition_range& range = *i; + std::vector live_endpoints = get_live_sorted_endpoints(ks, range.end_value().token()); + std::vector filtered_endpoints = filter_for_query(cl, ks, live_endpoints); + ++i; + + // getRestrictedRange has broken the queried range into per-[vnode] token ranges, but this doesn't take + // the replication factor into account. If the intersection of live endpoints for 2 consecutive ranges + // still meets the CL requirements, then we can merge both ranges into the same RangeSliceCommand. + while (i != ranges.end()) + { + auto next_range = i; + std::vector next_endpoints = get_live_sorted_endpoints(ks, next_range->end_value().token()); + std::vector next_filtered_endpoints = filter_for_query(cl, ks, next_endpoints); + + // Origin has this to say here: + // * If the current range right is the min token, we should stop merging because CFS.getRangeSlice + // * don't know how to deal with a wrapping range. + // * Note: it would be slightly more efficient to have CFS.getRangeSlice on the destination nodes unwraps + // * the range if necessary and deal with it. However, we can't start sending wrapped range without breaking + // * wire compatibility, so It's likely easier not to bother; + // It obviously not apply for us(?), but lets follow origin for now + if (range.end_value().token() == dht::minimum_token()) { + break; + } + + std::vector merged = intersection(live_endpoints, next_endpoints); + + // Check if there is enough endpoint for the merge to be possible. + if (!is_sufficient_live_nodes(cl, ks, merged)) { + break; + } + + std::vector filtered_merged = filter_for_query(cl, ks, merged); + + // Estimate whether merging will be a win or not + if (!locator::i_endpoint_snitch::get_local_snitch_ptr()->is_worth_merging_for_range_query(filtered_merged, filtered_endpoints, next_filtered_endpoints)) { + break; + } + + // If we get there, merge this range and the next one + range = query::partition_range(range.start(), next_range->end()); + live_endpoints = std::move(merged); + filtered_endpoints = std::move(filtered_merged); + ++i; + } + db::assure_sufficient_live_nodes(cl, ks, filtered_endpoints); + exec.push_back(::make_shared(*this, cmd, std::move(range), cl, std::move(filtered_endpoints))); + } + + query::result_merger merger; + merger.reserve(exec.size()); + + auto f = ::map_reduce(exec.begin(), exec.end(), [this, cmd] (::shared_ptr& rex) { + return rex->execute(); + }, std::move(merger)); + + return f.then([this, exec = std::move(exec), results = std::move(results), i = std::move(i), ranges = std::move(ranges), cl, cmd, concurrency_factor] + (foreign_ptr>&& result) mutable { + results.emplace_back(std::move(result)); + if (i == ranges.end()) { + return make_ready_future>>>(std::move(results)); + } else { + return query_partition_key_range_concurrent(std::move(results), cmd, cl, std::move(i), std::move(ranges), concurrency_factor); + } + }); +} + +future>> +storage_proxy::query_partition_key_range(lw_shared_ptr cmd, query::partition_range&& range, db::consistency_level cl) { + schema_ptr schema = _db.local().find_schema(cmd->cf_id); + keyspace& ks = _db.local().find_keyspace(schema->ks_name()); + std::vector ranges; + + // when dealing with LocalStrategy keyspaces, we can skip the range splitting and merging (which can be + // expensive in clusters with vnodes) + if (ks.get_replication_strategy().get_type() == locator::replication_strategy_type::local) { + ranges.emplace_back(std::move(range)); + } else { + ranges = get_restricted_ranges(ks, *schema, std::move(range)); + } + + // our estimate of how many result rows there will be per-range + float result_rows_per_range = estimate_result_rows_per_range(cmd, ks); + // underestimate how many rows we will get per-range in order to increase the likelihood that we'll + // fetch enough rows in the first round + result_rows_per_range -= result_rows_per_range * CONCURRENT_SUBREQUESTS_MARGIN; + int concurrency_factor = result_rows_per_range == 0.0 ? 1 : std::max(1, std::min(int(ranges.size()), int(std::ceil(cmd->row_limit / result_rows_per_range)))); + + std::vector>> results; + results.reserve(ranges.size()/concurrency_factor + 1); + + return query_partition_key_range_concurrent(std::move(results), cmd, cl, ranges.begin(), std::move(ranges), concurrency_factor) + .then([](std::vector>> results) { + query::result_merger merger; + merger.reserve(results.size()); + + for (auto&& r: results) { + merger(std::move(r)); + } + + return merger.get(); + }); +} + +future>> +storage_proxy::query_local(lw_shared_ptr cmd, std::vector&& partition_ranges) { + // FIXME: Respect cmd->row_limit to avoid unnecessary transfer + query::result_merger merger; + merger.reserve(smp::count); + return _db.map_reduce(std::move(merger), [cmd, partition_ranges = std::move(partition_ranges)] (database& db) { + return db.query(*cmd, partition_ranges).then([] (auto&& f) { + return make_foreign(std::move(f)); + }); + }).finally([cmd] {}); +} + future>> storage_proxy::query(schema_ptr s, lw_shared_ptr cmd, std::vector&& partition_ranges, db::consistency_level cl) { static auto make_empty = [] { @@ -1587,25 +1846,12 @@ storage_proxy::query(schema_ptr s, lw_shared_ptr cmd, std:: } } - return do_with(std::move(partition_ranges), [this, s, cmd](std::vector& prs) { - return map_reduce(prs.begin(), prs.end(), [this, s, cmd](const query::partition_range& pr) { - return query_mutations_locally(cmd, pr).then([s, cmd](auto&& result_ptr) { - return make_foreign(make_lw_shared(to_data_query_result(*result_ptr, s, cmd->slice))); - }); - }, query::result_merger()); - }); -} + if (partition_ranges.size() != 1) { + // FIXME: implement + throw std::runtime_error("more than one non singular range not supported yet"); + } -future>> -storage_proxy::query_local(lw_shared_ptr cmd, std::vector&& partition_ranges) { - // FIXME: Respect cmd->row_limit to avoid unnecessary transfer - query::result_merger merger; - merger.reserve(smp::count); - return _db.map_reduce(std::move(merger), [cmd, partition_ranges = std::move(partition_ranges)] (database& db) { - return db.query(*cmd, partition_ranges).then([] (auto&& f) { - return make_foreign(std::move(f)); - }); - }).finally([cmd] {}); + return query_partition_key_range(cmd, std::move(partition_ranges[0]), cl); } #if 0 @@ -1966,60 +2212,60 @@ std::vector storage_proxy::get_live_sorted_endpoints(keyspace return eps; } -#if 0 - private static List intersection(List l1, List l2) - { - // Note: we don't use Guava Sets.intersection() for 3 reasons: - // 1) retainAll would be inefficient if l1 and l2 are large but in practice both are the replicas for a range and - // so will be very small (< RF). In that case, retainAll is in fact more efficient. - // 2) we do ultimately need a list so converting everything to sets don't make sense - // 3) l1 and l2 are sorted by proximity. The use of retainAll maintain that sorting in the result, while using sets wouldn't. - List inter = new ArrayList(l1); - inter.retainAll(l2); - return inter; - } +std::vector storage_proxy::intersection(const std::vector& l1, const std::vector& l2) { + std::vector inter; + inter.reserve(l1.size()); + std::remove_copy_if(l1.begin(), l1.end(), std::back_inserter(inter), [&l2] (const gms::inet_address& a) { + return std::find(l2.begin(), l2.end(), a) == l2.end(); + }); + return inter; +} - /** - * Estimate the number of result rows (either cql3 rows or storage rows, as called for by the command) per - * range in the ring based on our local data. This assumes that ranges are uniformly distributed across the cluster - * and that the queried data is also uniformly distributed. - */ - private static float estimateResultRowsPerRange(AbstractRangeCommand command, Keyspace keyspace) +/** + * Estimate the number of result rows (either cql3 rows or storage rows, as called for by the command) per + * range in the ring based on our local data. This assumes that ranges are uniformly distributed across the cluster + * and that the queried data is also uniformly distributed. + */ +float storage_proxy::estimate_result_rows_per_range(lw_shared_ptr cmd, keyspace& ks) +{ + return 1.0; +#if 0 + ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(command.columnFamily); + float resultRowsPerRange = Float.POSITIVE_INFINITY; + if (command.rowFilter != null && !command.rowFilter.isEmpty()) { - ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(command.columnFamily); - float resultRowsPerRange = Float.POSITIVE_INFINITY; - if (command.rowFilter != null && !command.rowFilter.isEmpty()) - { - List searchers = cfs.indexManager.getIndexSearchersForQuery(command.rowFilter); - if (searchers.isEmpty()) - { - resultRowsPerRange = calculateResultRowsUsingEstimatedKeys(cfs); - } - else - { - // Secondary index query (cql3 or otherwise). Estimate result rows based on most selective 2ary index. - for (SecondaryIndexSearcher searcher : searchers) - { - // use our own mean column count as our estimate for how many matching rows each node will have - SecondaryIndex highestSelectivityIndex = searcher.highestSelectivityIndex(command.rowFilter); - resultRowsPerRange = Math.min(resultRowsPerRange, highestSelectivityIndex.estimateResultRows()); - } - } - } - else if (!command.countCQL3Rows()) - { - // non-cql3 query - resultRowsPerRange = cfs.estimateKeys(); - } - else + List searchers = cfs.indexManager.getIndexSearchersForQuery(command.rowFilter); + if (searchers.isEmpty()) { resultRowsPerRange = calculateResultRowsUsingEstimatedKeys(cfs); } - - // adjust resultRowsPerRange by the number of tokens this node has and the replication factor for this ks - return (resultRowsPerRange / DatabaseDescriptor.getNumTokens()) / keyspace.getReplicationStrategy().getReplicationFactor(); + else + { + // Secondary index query (cql3 or otherwise). Estimate result rows based on most selective 2ary index. + for (SecondaryIndexSearcher searcher : searchers) + { + // use our own mean column count as our estimate for how many matching rows each node will have + SecondaryIndex highestSelectivityIndex = searcher.highestSelectivityIndex(command.rowFilter); + resultRowsPerRange = Math.min(resultRowsPerRange, highestSelectivityIndex.estimateResultRows()); + } + } + } + else if (!command.countCQL3Rows()) + { + // non-cql3 query + resultRowsPerRange = cfs.estimateKeys(); + } + else + { + resultRowsPerRange = calculateResultRowsUsingEstimatedKeys(cfs); } + // adjust resultRowsPerRange by the number of tokens this node has and the replication factor for this ks + return (resultRowsPerRange / DatabaseDescriptor.getNumTokens()) / keyspace.getReplicationStrategy().getReplicationFactor(); +#endif +} + +#if 0 private static float calculateResultRowsUsingEstimatedKeys(ColumnFamilyStore cfs) { if (cfs.metadata.comparator.isDense()) @@ -2034,229 +2280,6 @@ std::vector storage_proxy::get_live_sorted_endpoints(keyspace } } - public static List getRangeSlice(AbstractRangeCommand command, ConsistencyLevel consistency_level) - throws UnavailableException, ReadTimeoutException - { - Tracing.trace("Computing ranges to query"); - long startTime = System.nanoTime(); - - Keyspace keyspace = Keyspace.open(command.keyspace); - List rows; - // now scan until we have enough results - try - { - int cql3RowCount = 0; - rows = new ArrayList<>(); - - // when dealing with LocalStrategy keyspaces, we can skip the range splitting and merging (which can be - // expensive in clusters with vnodes) - List> ranges; - if (keyspace.getReplicationStrategy() instanceof LocalStrategy) - ranges = command.keyRange.unwrap(); - else - ranges = getRestrictedRanges(command.keyRange); - - // our estimate of how many result rows there will be per-range - float resultRowsPerRange = estimateResultRowsPerRange(command, keyspace); - // underestimate how many rows we will get per-range in order to increase the likelihood that we'll - // fetch enough rows in the first round - resultRowsPerRange -= resultRowsPerRange * CONCURRENT_SUBREQUESTS_MARGIN; - int concurrencyFactor = resultRowsPerRange == 0.0 - ? 1 - : Math.max(1, Math.min(ranges.size(), (int) Math.ceil(command.limit() / resultRowsPerRange))); - logger.debug("Estimated result rows per range: {}; requested rows: {}, ranges.size(): {}; concurrent range requests: {}", - resultRowsPerRange, command.limit(), ranges.size(), concurrencyFactor); - Tracing.trace("Submitting range requests on {} ranges with a concurrency of {} ({} rows per range expected)", new Object[]{ ranges.size(), concurrencyFactor, resultRowsPerRange}); - - boolean haveSufficientRows = false; - int i = 0; - AbstractBounds nextRange = null; - List nextEndpoints = null; - List nextFilteredEndpoints = null; - while (i < ranges.size()) - { - List>>> scanHandlers = new ArrayList<>(concurrencyFactor); - int concurrentFetchStartingIndex = i; - int concurrentRequests = 0; - while ((i - concurrentFetchStartingIndex) < concurrencyFactor) - { - AbstractBounds range = nextRange == null - ? ranges.get(i) - : nextRange; - List liveEndpoints = nextEndpoints == null - ? getLiveSortedEndpoints(keyspace, range.right) - : nextEndpoints; - List filteredEndpoints = nextFilteredEndpoints == null - ? consistency_level.filterForQuery(keyspace, liveEndpoints) - : nextFilteredEndpoints; - ++i; - ++concurrentRequests; - - // getRestrictedRange has broken the queried range into per-[vnode] token ranges, but this doesn't take - // the replication factor into account. If the intersection of live endpoints for 2 consecutive ranges - // still meets the CL requirements, then we can merge both ranges into the same RangeSliceCommand. - while (i < ranges.size()) - { - nextRange = ranges.get(i); - nextEndpoints = getLiveSortedEndpoints(keyspace, nextRange.right); - nextFilteredEndpoints = consistency_level.filterForQuery(keyspace, nextEndpoints); - - // If the current range right is the min token, we should stop merging because CFS.getRangeSlice - // don't know how to deal with a wrapping range. - // Note: it would be slightly more efficient to have CFS.getRangeSlice on the destination nodes unwraps - // the range if necessary and deal with it. However, we can't start sending wrapped range without breaking - // wire compatibility, so It's likely easier not to bother; - if (range.right.isMinimum()) - break; - - List merged = intersection(liveEndpoints, nextEndpoints); - - // Check if there is enough endpoint for the merge to be possible. - if (!consistency_level.isSufficientLiveNodes(keyspace, merged)) - break; - - List filteredMerged = consistency_level.filterForQuery(keyspace, merged); - - // Estimate whether merging will be a win or not - if (!DatabaseDescriptor.getEndpointSnitch().isWorthMergingForRangeQuery(filteredMerged, filteredEndpoints, nextFilteredEndpoints)) - break; - - // If we get there, merge this range and the next one - range = range.withNewRight(nextRange.right); - liveEndpoints = merged; - filteredEndpoints = filteredMerged; - ++i; - } - - AbstractRangeCommand nodeCmd = command.forSubRange(range); - - // collect replies and resolve according to consistency level - RangeSliceResponseResolver resolver = new RangeSliceResponseResolver(nodeCmd.keyspace, command.timestamp); - List minimalEndpoints = filteredEndpoints.subList(0, Math.min(filteredEndpoints.size(), consistency_level.blockFor(keyspace))); - ReadCallback> handler = new ReadCallback<>(resolver, consistency_level, nodeCmd, minimalEndpoints); - handler.assureSufficientLiveNodes(); - resolver.setSources(filteredEndpoints); - if (filteredEndpoints.size() == 1 - && filteredEndpoints.get(0).equals(FBUtilities.getBroadcastAddress()) - && OPTIMIZE_LOCAL_REQUESTS) - { - StageManager.getStage(Stage.READ).execute(new LocalRangeSliceRunnable(nodeCmd, handler), Tracing.instance.get()); - } - else - { - MessageOut message = nodeCmd.createMessage(); - for (InetAddress endpoint : filteredEndpoints) - { - Tracing.trace("Enqueuing request to {}", endpoint); - MessagingService.instance().sendRR(message, endpoint, handler); - } - } - scanHandlers.add(Pair.create(nodeCmd, handler)); - } - Tracing.trace("Submitted {} concurrent range requests covering {} ranges", concurrentRequests, i - concurrentFetchStartingIndex); - - List repairResponses = new ArrayList<>(); - for (Pair>> cmdPairHandler : scanHandlers) - { - AbstractRangeCommand nodeCmd = cmdPairHandler.left; - ReadCallback> handler = cmdPairHandler.right; - RangeSliceResponseResolver resolver = (RangeSliceResponseResolver)handler.resolver; - - try - { - for (Row row : handler.get()) - { - rows.add(row); - if (nodeCmd.countCQL3Rows()) - cql3RowCount += row.getLiveCount(command.predicate, command.timestamp); - } - repairResponses.addAll(resolver.repairResults); - } - catch (ReadTimeoutException ex) - { - // we timed out waiting for responses - int blockFor = consistency_level.blockFor(keyspace); - int responseCount = resolver.responses.size(); - String gotData = responseCount > 0 - ? resolver.isDataPresent() ? " (including data)" : " (only digests)" - : ""; - - if (Tracing.isTracing()) - { - Tracing.trace("Timed out; received {} of {} responses{} for range {} of {}", - new Object[]{ responseCount, blockFor, gotData, i, ranges.size() }); - } - else if (logger.isDebugEnabled()) - { - logger.debug("Range slice timeout; received {} of {} responses{} for range {} of {}", - responseCount, blockFor, gotData, i, ranges.size()); - } - throw ex; - } - catch (DigestMismatchException e) - { - throw new AssertionError(e); // no digests in range slices yet - } - - // if we're done, great, otherwise, move to the next range - int count = nodeCmd.countCQL3Rows() ? cql3RowCount : rows.size(); - if (count >= nodeCmd.limit()) - { - haveSufficientRows = true; - break; - } - } - - try - { - FBUtilities.waitOnFutures(repairResponses, DatabaseDescriptor.getWriteRpcTimeout()); - } - catch (TimeoutException ex) - { - // We got all responses, but timed out while repairing - int blockFor = consistency_level.blockFor(keyspace); - if (Tracing.isTracing()) - Tracing.trace("Timed out while read-repairing after receiving all {} data and digest responses", blockFor); - else - logger.debug("Range slice timeout while read-repairing after receiving all {} data and digest responses", blockFor); - throw new ReadTimeoutException(consistency_level, blockFor-1, blockFor, true); - } - - if (haveSufficientRows) - return trim(command, rows); - - // we didn't get enough rows in our concurrent fetch; recalculate our concurrency factor - // based on the results we've seen so far (as long as we still have ranges left to query) - if (i < ranges.size()) - { - float fetchedRows = command.countCQL3Rows() ? cql3RowCount : rows.size(); - float remainingRows = command.limit() - fetchedRows; - float actualRowsPerRange; - if (fetchedRows == 0.0) - { - // we haven't actually gotten any results, so query all remaining ranges at once - actualRowsPerRange = 0.0f; - concurrencyFactor = ranges.size() - i; - } - else - { - actualRowsPerRange = i / fetchedRows; - concurrencyFactor = Math.max(1, Math.min(ranges.size() - i, Math.round(remainingRows / actualRowsPerRange))); - } - logger.debug("Didn't get enough response rows; actual rows per range: {}; remaining rows: {}, new concurrent requests: {}", - actualRowsPerRange, (int) remainingRows, concurrencyFactor); - } - } - } - finally - { - long latency = System.nanoTime() - startTime; - rangeMetrics.addNano(latency); - Keyspace.open(command.keyspace).getColumnFamilyStore(command.columnFamily).metric.coordinatorScanLatency.update(latency, TimeUnit.NANOSECONDS); - } - return trim(command, rows); - } - private static List trim(AbstractRangeCommand command, List rows) { // When maxIsColumns, we let the caller trim the result. @@ -2344,54 +2367,53 @@ std::vector storage_proxy::get_live_sorted_endpoints(keyspace return results; } +#endif - /** - * Compute all ranges we're going to query, in sorted order. Nodes can be replica destinations for many ranges, - * so we need to restrict each scan to the specific range we want, or else we'd get duplicate results. - */ - static > List> getRestrictedRanges(final AbstractBounds queryRange) - { - // special case for bounds containing exactly 1 (non-minimum) token - if (queryRange instanceof Bounds && queryRange.left.equals(queryRange.right) && !queryRange.left.isMinimum()) - { - return Collections.singletonList(queryRange); - } - - TokenMetadata tokenMetadata = StorageService.instance.getTokenMetadata(); - - List> ranges = new ArrayList>(); - // divide the queryRange into pieces delimited by the ring and minimum tokens - Iterator ringIter = TokenMetadata.ringIterator(tokenMetadata.sortedTokens(), queryRange.left.getToken(), true); - AbstractBounds remainder = queryRange; - while (ringIter.hasNext()) - { - /* - * remainder can be a range/bounds of token _or_ keys and we want to split it with a token: - * - if remainder is tokens, then we'll just split using the provided token. - * - if remainder is keys, we want to split using token.upperBoundKey. For instance, if remainder - * is [DK(10, 'foo'), DK(20, 'bar')], and we have 3 nodes with tokens 0, 15, 30. We want to - * split remainder to A=[DK(10, 'foo'), 15] and B=(15, DK(20, 'bar')]. But since we can't mix - * tokens and keys at the same time in a range, we uses 15.upperBoundKey() to have A include all - * keys having 15 as token and B include none of those (since that is what our node owns). - * asSplitValue() abstracts that choice. - */ - Token upperBoundToken = ringIter.next(); - T upperBound = (T)upperBoundToken.upperBound(queryRange.left.getClass()); - if (!remainder.left.equals(upperBound) && !remainder.contains(upperBound)) - // no more splits - break; - Pair,AbstractBounds> splits = remainder.split(upperBound); - if (splits == null) - continue; - - ranges.add(splits.left); - remainder = splits.right; - } - ranges.add(remainder); - - return ranges; +/** + * Compute all ranges we're going to query, in sorted order. Nodes can be replica destinations for many ranges, + * so we need to restrict each scan to the specific range we want, or else we'd get duplicate results. + */ +std::vector +storage_proxy::get_restricted_ranges(keyspace& ks, const schema& s, query::partition_range range) { + // special case for bounds containing exactly 1 (non-minimum) token + if (range.is_singular()) { + return std::vector({std::move(range)}); } + locator::token_metadata& tm = get_local_storage_service().get_token_metadata(); + + std::vector ranges; + + // divide the queryRange into pieces delimited by the ring and minimum tokens + auto ring_iter = tm.ring_range(range.start_value().token(), true); + query::partition_range remainder = range; + for(const dht::token& upper_bound_token: ring_iter) + { + /* + * remainder can be a range/bounds of token _or_ keys and we want to split it with a token: + * - if remainder is tokens, then we'll just split using the provided token. + * - if remainder is keys, we want to split using token.upperBoundKey. For instance, if remainder + * is [DK(10, 'foo'), DK(20, 'bar')], and we have 3 nodes with tokens 0, 15, 30. We want to + * split remainder to A=[DK(10, 'foo'), 15] and B=(15, DK(20, 'bar')]. But since we can't mix + * tokens and keys at the same time in a range, we uses 15.upperBoundKey() to have A include all + * keys having 15 as token and B include none of those (since that is what our node owns). + * asSplitValue() abstracts that choice. + */ + dht::ring_position split_point(upper_bound_token); + if (!remainder.contains(split_point, dht::ring_position_comparator(s))) { + break; // no more splits + } + std::pair splits = remainder.split(split_point, dht::ring_position_comparator(s)); + + ranges.emplace_back(std::move(splits.first)); + remainder = std::move(splits.second); + } + ranges.emplace_back(std::move(remainder)); + + return ranges; +} + +#if 0 public long getReadOperations() { return readMetrics.latency.count(); diff --git a/service/storage_proxy.hh b/service/storage_proxy.hh index 74f386dad6..3a22b61bf0 100644 --- a/service/storage_proxy.hh +++ b/service/storage_proxy.hh @@ -61,6 +61,7 @@ private: size_t _total_hints_in_progress = 0; std::unordered_map _hints_in_progress; stats _stats; + static constexpr float CONCURRENT_SUBREQUESTS_MARGIN = 0.10; private: void init_messaging_service(); future>> query_singular(lw_shared_ptr cmd, std::vector&& partition_ranges, db::consistency_level cl); @@ -81,6 +82,11 @@ private: ::shared_ptr get_read_executor(lw_shared_ptr cmd, query::partition_range pr, db::consistency_level cl); future>> query_singular_local(lw_shared_ptr cmd, const query::partition_range& pr); future query_singular_local_digest(lw_shared_ptr cmd, const query::partition_range& pr); + future>> query_partition_key_range(lw_shared_ptr cmd, query::partition_range&& range, db::consistency_level cl); + std::vector get_restricted_ranges(keyspace& ks, const schema& s, query::partition_range range); + float estimate_result_rows_per_range(lw_shared_ptr cmd, keyspace& ks); + static std::vector intersection(const std::vector& l1, const std::vector& l2); + future>>> query_partition_key_range_concurrent(std::vector>>&& results, lw_shared_ptr cmd, db::consistency_level cl, std::vector::iterator&& i, std::vector&& ranges, int concurrency_factor); public: storage_proxy(distributed& db);