Merge "Optimise paged queries" from Paweł

"
This series adds some optimisations to the paging logic, that attempt to
close the performance gap between paged and not paged queries. The
former are more complex so always are going to be slower, but the
performance loss was unacceptably large.

Fixes #3619.

Performance with paging:
        ./perf_paging_before  ./perf_paging_after   diff
 read              271246.13            312815.49  15.3%

Without paging:
        ./perf_nopaging_before  ./perf_nopaging_after   diff
 read                343732.17              342575.77  -0.3%

Tests: unit(release), dtests(paging_test.py, paging_additional_test.py)
"

* tag 'optimise-paging/v1' of https://github.com/pdziepak/scylla:
  cql3: select statement: don't copy metadata if not needed
  cql3: query_options: make simple getter inlineable
  cql3: metadata: avoid copying column information
  query_pager: avoid visiting result_view if not needed
  query::result_view: add get_last_partition_and_clustering_key()
  query::result_reader: fix const correctness
  tests/uuid: add more tests including make_randm_uuid()
  utils: uuid: don't use std::random_device()
This commit is contained in:
Avi Kivity
2018-07-26 19:24:03 +03:00
9 changed files with 190 additions and 155 deletions

View File

@@ -135,21 +135,6 @@ query_options::query_options(std::vector<cql3::raw_value> values)
db::consistency_level::ONE, infinite_timeout_config, std::move(values))
{}
db::consistency_level query_options::get_consistency() const
{
return _consistency;
}
cql3::raw_value_view query_options::get_value_at(size_t idx) const
{
return _value_views.at(idx);
}
size_t query_options::get_values_count() const
{
return _value_views.size();
}
cql3::raw_value_view query_options::make_temporary(cql3::raw_value value) const
{
if (value) {
@@ -178,56 +163,6 @@ bytes_view query_options::linearize(fragmented_temporary_buffer::view view) cons
}
}
bool query_options::skip_metadata() const
{
return _skip_metadata;
}
int32_t query_options::get_page_size() const
{
return get_specific_options().page_size;
}
::shared_ptr<service::pager::paging_state> query_options::get_paging_state() const
{
return get_specific_options().state;
}
std::experimental::optional<db::consistency_level> query_options::get_serial_consistency() const
{
return get_specific_options().serial_consistency;
}
api::timestamp_type query_options::get_timestamp(service::query_state& state) const
{
auto tstamp = get_specific_options().timestamp;
return tstamp != api::missing_timestamp ? tstamp : state.get_timestamp();
}
int query_options::get_protocol_version() const
{
return _cql_serialization_format.protocol_version();
}
cql_serialization_format query_options::get_cql_serialization_format() const
{
return _cql_serialization_format;
}
const query_options::specific_options& query_options::get_specific_options() const
{
return _options;
}
const query_options& query_options::for_statement(size_t i) const
{
if (!_batch_options) {
// No per-statement options supplied, so use the "global" options
return *this;
}
return _batch_options->at(i);
}
void query_options::prepare(const std::vector<::shared_ptr<column_specification>>& specs)
{
if (!_names) {

View File

@@ -156,34 +156,76 @@ public:
std::vector<cql3::raw_value> values, specific_options options = specific_options::DEFAULT);
explicit query_options(std::unique_ptr<query_options>, ::shared_ptr<service::pager::paging_state> paging_state);
db::consistency_level get_consistency() const;
const timeout_config& get_timeout_config() const { return _timeout_config; }
cql3::raw_value_view get_value_at(size_t idx) const;
db::consistency_level get_consistency() const {
return _consistency;
}
cql3::raw_value_view get_value_at(size_t idx) const {
return _value_views.at(idx);
}
size_t get_values_count() const {
return _value_views.size();
}
cql3::raw_value_view make_temporary(cql3::raw_value value) const;
bytes_view linearize(fragmented_temporary_buffer::view) const;
size_t get_values_count() const;
bool skip_metadata() const;
/** The pageSize for this query. Will be <= 0 if not relevant for the query. */
int32_t get_page_size() const;
bool skip_metadata() const {
return _skip_metadata;
}
int32_t get_page_size() const {
return get_specific_options().page_size;
}
/** The paging state for this query, or null if not relevant. */
::shared_ptr<service::pager::paging_state> get_paging_state() const;
::shared_ptr<service::pager::paging_state> get_paging_state() const {
return get_specific_options().state;
}
/** Serial consistency for conditional updates. */
std::experimental::optional<db::consistency_level> get_serial_consistency() const;
std::experimental::optional<db::consistency_level> get_serial_consistency() const {
return get_specific_options().serial_consistency;
}
api::timestamp_type get_timestamp(service::query_state& state) const {
auto tstamp = get_specific_options().timestamp;
return tstamp != api::missing_timestamp ? tstamp : state.get_timestamp();
}
/**
* The protocol version for the query. Will be 3 if the object don't come from
* a native protocol request (i.e. it's been allocated locally or by CQL-over-thrift).
*/
int get_protocol_version() const {
return _cql_serialization_format.protocol_version();
}
cql_serialization_format get_cql_serialization_format() const {
return _cql_serialization_format;
}
const query_options::specific_options& get_specific_options() const {
return _options;
}
// Mainly for the sake of BatchQueryOptions
const query_options& for_statement(size_t i) const {
if (!_batch_options) {
// No per-statement options supplied, so use the "global" options
return *this;
}
return _batch_options->at(i);
}
const std::experimental::optional<std::vector<sstring_view>>& get_names() const noexcept {
return _names;
}
api::timestamp_type get_timestamp(service::query_state& state) const;
/**
* The protocol version for the query. Will be 3 if the object don't come from
* a native protocol request (i.e. it's been allocated locally or by CQL-over-thrift).
*/
int get_protocol_version() const;
cql_serialization_format get_cql_serialization_format() const;
// Mainly for the sake of BatchQueryOptions
const specific_options& get_specific_options() const;
const query_options& for_statement(size_t i) const;
void prepare(const std::vector<::shared_ptr<column_specification>>& specs);
private:
void fill_value_views();

View File

@@ -45,27 +45,25 @@ namespace cql3 {
metadata::metadata(std::vector<::shared_ptr<column_specification>> names_)
: _flags(flag_enum_set())
, names(std::move(names_)) {
_column_count = names.size();
}
, _column_info(make_lw_shared<column_info>(std::move(names_)))
{ }
metadata::metadata(flag_enum_set flags, std::vector<::shared_ptr<column_specification>> names_, uint32_t column_count,
::shared_ptr<const service::pager::paging_state> paging_state)
: _flags(flags)
, names(std::move(names_))
, _column_count(column_count)
, _column_info(make_lw_shared<column_info>(std::move(names_), column_count))
, _paging_state(std::move(paging_state))
{ }
// The maximum number of values that the ResultSet can hold. This can be bigger than columnCount due to CASSANDRA-4911
uint32_t metadata::value_count() const {
return _flags.contains<flag::NO_METADATA>() ? _column_count : names.size();
return _flags.contains<flag::NO_METADATA>() ? _column_info->_column_count : _column_info->_names.size();
}
void metadata::add_non_serialized_column(::shared_ptr<column_specification> name) {
// See comment above. Because columnCount doesn't account the newly added name, it
// won't be serialized.
names.emplace_back(std::move(name));
_column_info->_names.emplace_back(std::move(name));
}
bool metadata::all_in_same_cf() const {
@@ -73,7 +71,7 @@ bool metadata::all_in_same_cf() const {
return false;
}
return column_specification::all_in_same_table(names);
return column_specification::all_in_same_table(_column_info->_names);
}
void metadata::set_has_more_pages(::shared_ptr<const service::pager::paging_state> paging_state) {
@@ -93,18 +91,10 @@ metadata::flag_enum_set metadata::flags() const {
return _flags;
}
uint32_t metadata::column_count() const {
return _column_count;
}
::shared_ptr<const service::pager::paging_state> metadata::paging_state() const {
return _paging_state;
}
const std::vector<::shared_ptr<column_specification>>& metadata::get_names() const {
return names;
}
prepared_metadata::prepared_metadata(const std::vector<::shared_ptr<column_specification>>& names,
const std::vector<uint16_t>& partition_key_bind_indices)
: _names{names}

View File

@@ -70,18 +70,29 @@ public:
using flag_enum_set = enum_set<flag_enum>;
private:
flag_enum_set _flags;
public:
struct column_info {
// Please note that columnCount can actually be smaller than names, even if names is not null. This is
// used to include columns in the resultSet that we need to do post-query re-orderings
// (SelectStatement.orderResults) but that shouldn't be sent to the user as they haven't been requested
// (CASSANDRA-4911). So the serialization code will exclude any columns in name whose index is >= columnCount.
std::vector<::shared_ptr<column_specification>> names;
std::vector<::shared_ptr<column_specification>> _names;
uint32_t _column_count;
column_info(std::vector<::shared_ptr<column_specification>> names, uint32_t column_count)
: _names(std::move(names))
, _column_count(column_count)
{ }
explicit column_info(std::vector<::shared_ptr<column_specification>> names)
: _names(std::move(names))
, _column_count(_names.size())
{ }
};
private:
flag_enum_set _flags;
private:
uint32_t _column_count;
lw_shared_ptr<column_info> _column_info;
::shared_ptr<const service::pager::paging_state> _paging_state;
public:
@@ -105,11 +116,13 @@ public:
flag_enum_set flags() const;
uint32_t column_count() const;
uint32_t column_count() const { return _column_info->_column_count; }
::shared_ptr<const service::pager::paging_state> paging_state() const;
const std::vector<::shared_ptr<column_specification>>& get_names() const;
const std::vector<::shared_ptr<column_specification>>& get_names() const {
return _column_info->_names;
}
};
::shared_ptr<const cql3::metadata> make_empty_metadata();
@@ -223,14 +236,14 @@ public:
class result {
std::unique_ptr<cql3::result_set> _result_set;
result_generator _result_generator;
shared_ptr<cql3::metadata> _metadata;
shared_ptr<const cql3::metadata> _metadata;
public:
explicit result(std::unique_ptr<cql3::result_set> rs)
: _result_set(std::move(rs))
, _metadata(_result_set->_metadata)
{ }
explicit result(result_generator generator, shared_ptr<metadata> m)
explicit result(result_generator generator, shared_ptr<const metadata> m)
: _result_generator(std::move(generator))
, _metadata(std::move(m))
{ }
@@ -240,7 +253,7 @@ public:
if (_result_set) {
return *_result_set;
} else {
auto builder = result_set::builder(_metadata);
auto builder = result_set::builder(make_shared<cql3::metadata>(*_metadata));
_result_generator.visit(builder);
return std::move(builder).get_result_set();
}

View File

@@ -442,10 +442,15 @@ select_statement::do_execute(service::storage_proxy& proxy,
if (_selection->is_trivial() && !_restrictions->need_filtering()) {
return p->fetch_page_generator(page_size, now, _stats).then([this, p, limit] (result_generator generator) {
auto meta = make_shared<metadata>(*_selection->get_result_metadata());
if (!p->is_exhausted()) {
meta->set_has_more_pages(p->state());
}
auto meta = [&] () -> shared_ptr<const cql3::metadata> {
if (!p->is_exhausted()) {
auto meta = make_shared<metadata>(*_selection->get_result_metadata());
meta->set_has_more_pages(p->state());
return meta;
} else {
return _selection->get_result_metadata();
}
}();
return shared_ptr<cql_transport::messages::result_message>(
make_shared<cql_transport::messages::result_message::rows>(result(std::move(generator), std::move(meta)))

View File

@@ -156,6 +156,7 @@ class result_view {
public:
result_view(const bytes_ostream& v) : _v(ser::query_result_view{ser::as_input_stream(v)}) {}
result_view(ser::query_result_view v) : _v(v) {}
explicit result_view(const query::result& res) : result_view(res.buf()) { }
template <typename Func>
static auto do_with(const query::result& res, Func&& func) {
@@ -165,14 +166,12 @@ public:
template <typename ResultVisitor>
static void consume(const query::result& res, const partition_slice& slice, ResultVisitor&& visitor) {
do_with(res, [&] (result_view v) {
v.consume(slice, visitor);
});
result_view(res).consume(slice, visitor);
}
template <typename Visitor>
GCC6_CONCEPT(requires ResultVisitor<Visitor>)
void consume(const partition_slice& slice, Visitor&& visitor) {
void consume(const partition_slice& slice, Visitor&& visitor) const {
for (auto&& p : _v.partitions()) {
auto rows = p.rows();
auto row_count = rows.size();
@@ -198,13 +197,21 @@ public:
}
}
std::tuple<uint32_t, uint32_t> count_partitions_and_rows() {
std::tuple<uint32_t, uint32_t> count_partitions_and_rows() const {
auto&& ps = _v.partitions();
auto rows = boost::accumulate(ps | boost::adaptors::transformed([] (auto& p) {
return std::max(p.rows().size(), size_t(1));
}), uint32_t(0));
return std::make_tuple(ps.size(), rows);
}
std::tuple<partition_key, stdx::optional<clustering_key>>
get_last_partition_and_clustering_key() const {
auto ps = _v.partitions();
auto& p = ps.back();
auto rs = p.rows();
return { p.key().value(), !rs.empty() ? rs.back().key() : stdx::optional<clustering_key>() };
}
};
}

View File

@@ -50,6 +50,14 @@ static logging::logger qlogger("paging");
namespace service::pager {
struct noop_visitor {
void accept_new_partition(uint32_t) { }
void accept_new_partition(const partition_key& key, uint32_t row_count) { }
void accept_new_row(const clustering_key& key, const query::result_row_view& static_row, const query::result_row_view& row) { }
void accept_new_row(const query::result_row_view& static_row, const query::result_row_view& row) { }
void accept_partition_end(const query::result_row_view& static_row) { }
};
static bool has_clustering_keys(const schema& s, const query::read_command& cmd) {
return s.clustering_key_size() > 0
&& !cmd.slice.options.contains<query::partition_slice::option::distinct>();
@@ -247,14 +255,6 @@ static bool has_clustering_keys(const schema& s, const query::read_command& cmd)
future<cql3::result_generator> query_pager::fetch_page_generator(uint32_t page_size, gc_clock::time_point now, cql3::cql_stats& stats) {
return do_fetch_page(page_size, now).then([this, page_size, now, &stats] (service::storage_proxy::coordinator_query_result qr) {
struct noop_visitor {
void accept_new_partition(uint32_t) { }
void accept_new_partition(const partition_key& key, uint32_t row_count) { }
void accept_new_row(const clustering_key& key, const query::result_row_view& static_row, const query::result_row_view& row) { }
void accept_new_row(const query::result_row_view& static_row, const query::result_row_view& row) { }
void accept_partition_end(const query::result_row_view& static_row) { }
};
_last_replicas = std::move(qr.last_replicas);
_query_read_repair_decision = qr.read_repair_decision;
handle_result(noop_visitor(), qr.query_result, page_size, now);
@@ -335,24 +335,47 @@ public:
const foreign_ptr<lw_shared_ptr<query::result>>& results,
uint32_t page_size, gc_clock::time_point now) {
query_result_visitor<Visitor> v(std::forward<Visitor>(visitor));
query::result_view::consume(*results, _cmd->slice, v);
if (_last_pkey) {
auto update_slice = [&] (const partition_key& last_pkey) {
// refs #752, when doing aggregate queries we will re-use same
// slice repeatedly. Since "specific ck ranges" only deal with
// a single extra range, we must clear out the old one
// Even if it was not so of course, leaving junk in the slice
// is bad.
_cmd->slice.clear_range(*_schema, *_last_pkey);
_cmd->slice.clear_range(*_schema, last_pkey);
};
auto view = query::result_view(*results);
uint32_t row_count;
if constexpr(!std::is_same_v<std::decay_t<Visitor>, noop_visitor>) {
query_result_visitor<Visitor> v(std::forward<Visitor>(visitor));
view.consume(_cmd->slice, v);
if (_last_pkey) {
update_slice(*_last_pkey);
}
row_count = v.total_rows;
_max = _max - row_count;
_exhausted = (v.total_rows < page_size && !results->is_short_read()) || _max == 0;
_last_pkey = v.last_pkey;
_last_ckey = v.last_ckey;
} else {
row_count = results->row_count() ? *results->row_count() : std::get<1>(view.count_partitions_and_rows());
_max = _max - row_count;
_exhausted = (row_count < page_size && !results->is_short_read()) || _max == 0;
if (!_exhausted) {
if (_last_pkey) {
update_slice(*_last_pkey);
}
auto [ last_pkey, last_ckey ] = view.get_last_partition_and_clustering_key();
_last_pkey = std::move(last_pkey);
_last_ckey = std::move(last_ckey);
}
}
_max = _max - v.total_rows;
_exhausted = (v.total_rows < page_size && !results->is_short_read()) || _max == 0;
_last_pkey = v.last_pkey;
_last_ckey = v.last_ckey;
qlogger.debug("Fetched {} rows, max_remain={} {}", v.total_rows, _max, _exhausted ? "(exh)" : "");
qlogger.debug("Fetched {} rows, max_remain={} {}", row_count, _max, _exhausted ? "(exh)" : "");
if (_last_pkey) {
qlogger.debug("Last partition key: {}", *_last_pkey);

View File

@@ -50,3 +50,30 @@ BOOST_AUTO_TEST_CASE(test_UUID_comparison) {
BOOST_REQUIRE_GT(p.first, p.second);
}
}
BOOST_AUTO_TEST_CASE(test_from_string) {
auto check = [] (sstring_view sv) {
auto uuid = UUID(sv);
BOOST_CHECK_EQUAL(uuid.version(), 4);
BOOST_CHECK_EQUAL(uuid.to_sstring(), sv);
BOOST_CHECK_EQUAL((uuid.get_least_significant_bits() >> 62) & 0x3, 2);
};
check("b1415756-49c3-4fa8-9b72-d1b867b032af");
check("85859d5c-fcf3-4b0b-9089-197b8b06735c");
check("e596c2f2-d29d-44a0-bb89-0a90ff928490");
check("f28f86f5-cbc2-4526-ba25-db90c226ec6a");
check("ce84997b-6ea2-4468-9f02-8a65abf4141a");
}
BOOST_AUTO_TEST_CASE(test_make_random_uuid) {
std::vector<UUID> uuids;
for (auto i = 0; i < 100; i++) {
auto uuid = utils::make_random_uuid();
BOOST_CHECK_EQUAL(uuid.version(), 4);
BOOST_CHECK_EQUAL((uuid.get_least_significant_bits() >> 62) & 0x3, 2);
uuids.emplace_back(uuid);
}
std::sort(uuids.begin(), uuids.end());
BOOST_CHECK(std::unique(uuids.begin(), uuids.end()) == uuids.end());
}

View File

@@ -34,23 +34,16 @@ namespace utils {
UUID
make_random_uuid() {
// FIXME: keep in userspace
static thread_local std::random_device urandom;
static thread_local std::uniform_int_distribution<uint8_t> dist(0, 255);
union {
uint8_t b[16];
struct {
uint64_t msb, lsb;
} w;
} v;
for (auto& b : v.b) {
b = dist(urandom);
}
v.b[6] &= 0x0f;
v.b[6] |= 0x40; // version 4
v.b[8] &= 0x3f;
v.b[8] |= 0x80; // IETF variant
return UUID(net::hton(v.w.msb), net::hton(v.w.lsb));
static thread_local std::mt19937_64 engine(std::random_device().operator()());
static thread_local std::uniform_int_distribution<uint64_t> dist;
uint64_t msb, lsb;
msb = dist(engine);
lsb = dist(engine);
msb &= ~uint64_t(0x0f << 12);
msb |= 0x4 << 12; // version 4
lsb &= ~(uint64_t(0x3) << 62);
lsb |= uint64_t(0x2) << 62; // IETF variant
return UUID(msb, lsb);
}
std::ostream& operator<<(std::ostream& out, const UUID& uuid) {