sstables: mutation_reader: Use index_reader for single-partition reads

This switches single-partition query to use the index_reader
infrastructure. Index lookups via index_reader are faster than
find_disk_ranges().

perf_fast_forward, rows: 1000000, value size: 100

Before:

  Testing forwarding with clustering restriction in a large partition:
  pk-scan   time [s]     frags     frag/s    aio      [KiB] blocked dropped  idx hit idx miss  idx blk    cpu
  no        0.002182         2        916      3        152       2       0        0        1        1  88.1%

After:

  Testing forwarding with clustering restriction in a large partition:
  pk-scan   time [s]     frags     frag/s    aio      [KiB] blocked dropped  idx hit idx miss  idx blk    cpu
  no        0.000758         2       2639      3        152       2       0        0        1        1  48.6%

This is also a cleanup, a step towards converting all code to use the
index_reader.
This commit is contained in:
Tomasz Grabiec
2017-04-11 11:14:01 +02:00
parent 9d8795089d
commit 4742008b70
3 changed files with 64 additions and 186 deletions

View File

@@ -497,7 +497,7 @@ public:
return parallel_for_each(std::move(candidates),
[this](const lw_shared_ptr<sstables::sstable>& sstable) {
tracing::trace(_trace_state, "Reading key {} from sstable {}", _pr, seastar::value_of([&sstable] { return sstable->get_filename(); }));
return sstable->read_row(_schema, _key, _slice, _pc, _fwd).then([this](auto smo) {
return sstable->read_row(_schema, _pr.start()->value(), _slice, _pc, _fwd).then([this](auto smo) {
if (smo) {
_mutations.emplace_back(std::move(*smo));
}

View File

@@ -760,6 +760,8 @@ struct sstable_data_source : public enable_lw_shared_from_this<sstable_data_sour
schema_ptr _schema;
stdx::optional<dht::decorated_key> _key;
struct single_partition_tag {};
sstable_data_source(schema_ptr s, shared_sstable sst, mp_row_consumer&& consumer)
: _sst(std::move(sst))
, _consumer(std::move(consumer))
@@ -778,11 +780,16 @@ struct sstable_data_source : public enable_lw_shared_from_this<sstable_data_sour
, _schema(std::move(s))
{ }
sstable_data_source(schema_ptr s, shared_sstable sst, const sstables::key& k, const io_priority_class& pc,
const query::partition_slice& slice, sstable::disk_read_range toread, streamed_mutation::forwarding fwd)
sstable_data_source(single_partition_tag, schema_ptr s, shared_sstable sst, mp_row_consumer&& consumer,
std::unique_ptr<index_reader> lh_index, std::unique_ptr<index_reader> rh_index)
: _sst(std::move(sst))
, _consumer(k, s, slice, pc, fwd)
, _context(_sst->data_consume_single_partition(_consumer, std::move(toread)))
, _consumer(std::move(consumer))
, _read_enabled(lh_index->data_file_position() != rh_index->data_file_position())
, _context(_sst->data_consume_single_partition(_consumer,
sstable::disk_read_range(lh_index->data_file_position(), rh_index->data_file_position())))
, _lh_index(std::move(lh_index))
, _rh_index(std::move(rh_index))
, _schema(std::move(s))
{ }
~sstable_data_source() {
@@ -871,21 +878,6 @@ public:
});
});
}
static future<streamed_mutation> create(schema_ptr s, shared_sstable sst, const sstables::key& k,
const query::partition_slice& slice,
const io_priority_class& pc,
sstable::disk_read_range toread,
streamed_mutation::forwarding fwd)
{
auto ds = make_lw_shared<sstable_data_source>(s, sst, k, pc, slice, std::move(toread), fwd);
return ds->_context.read().then([s, ds] {
auto mut = ds->_consumer.get_mutation();
assert(mut);
auto dk = dht::global_partitioner().decorate_key(*s, std::move(mut->key));
return make_streamed_mutation<sstable_streamed_mutation>(s, std::move(dk), mut->tomb, ds);
});
}
};
row_consumer::proceed
@@ -1027,21 +1019,10 @@ sstables::sstable::read_row(schema_ptr schema,
const sstables::key& key,
const query::partition_slice& slice,
const io_priority_class& pc,
streamed_mutation::forwarding fwd) {
assert(schema);
return find_disk_ranges(schema, key, slice, pc).then([this, &key, &slice, &pc, schema, fwd] (disk_read_range toread) {
if (!toread.found_row()) {
_filter_tracker.add_false_positive();
}
if (!toread) {
return make_ready_future<streamed_mutation_opt>();
}
_filter_tracker.add_true_positive();
return sstable_streamed_mutation::create(schema, this->shared_from_this(), key, slice, pc, std::move(toread), fwd).then([] (auto sm) {
return streamed_mutation_opt(std::move(sm));
});
streamed_mutation::forwarding fwd)
{
return do_with(dht::global_partitioner().decorate_key(*schema, key.to_partition_key(*schema)), [this, schema, &slice, &pc, fwd] (auto& dk) {
return this->read_row(schema, dk, slice, pc, fwd);
});
}
@@ -1071,156 +1052,6 @@ static inline bytes_view consume_bytes(bytes_view& p, size_t len) {
return ret;
}
static inline clustering_key_prefix get_clustering_key(
const schema& schema, bytes_view col_name) {
mp_row_consumer::column col(schema, std::move(col_name), api::max_timestamp);
return std::move(col.clustering);
}
static bool has_static_columns(const schema& schema, index_entry &ie) {
// We can easily check if there are any static columns in this partition,
// because the static columns always come first, so the first promoted
// index block will start with one, if there are any. The name of a static
// column is a composite beginning with a special marker (0xffff).
// But we can only assume the column name is composite if the schema is
// compound - if it isn't, we cannot have any static columns anyway.
//
// The first 18 bytes are deletion times (4+8), num blocks (4), and
// length of start column (2). Then come the actual column name bytes.
// See also composite::is_static().
auto data = ie.get_promoted_index_bytes();
return schema.is_compound() && data.size() >= 20 && data[18] == -1 && data[19] == -1;
}
future<sstable::disk_read_range>
sstables::sstable::find_disk_ranges(
schema_ptr schema, const sstables::key& key,
const query::partition_slice& slice,
const io_priority_class& pc) {
auto& partitioner = dht::global_partitioner();
auto token = partitioner.get_token(key_view(key));
if (token < partitioner.get_token(key_view(_components->summary.first_key.value))
|| token > partitioner.get_token(key_view(_components->summary.last_key.value))) {
return make_ready_future<disk_read_range>();
}
auto summary_idx = adjust_binary_search_index(binary_search(_components->summary.entries, key, token));
if (summary_idx < 0) {
return make_ready_future<disk_read_range>();
}
return read_indexes(summary_idx, pc).then([this, schema, &slice, &key, token, summary_idx, &pc] (auto index_list) {
auto index_idx = binary_search(index_list, key, token);
if (index_idx < 0) {
return make_ready_future<disk_read_range>();
}
index_entry& ie = index_list[index_idx];
if (ie.get_promoted_index_bytes().size() >= 16) {
try {
auto&& pkey = partition_key::from_exploded(*schema, key.explode(*schema));
auto ck_ranges = query::clustering_key_filter_ranges::get_ranges(*schema, slice, pkey);
if (ck_ranges.size() == 1 && ck_ranges.begin()->is_full()) {
// When no clustering filter is given to sstable::read_row(),
// we get here one range unbounded on both sides. This is fine
// (the code below will work with an unbounded range), but
// let's drop this range to revert to the classic behavior of
// reading entire sstable row without using the promoted index
} else if (has_static_columns(*schema, ie)) {
// FIXME: If we need to read the static columns and also a
// non-full clustering key range, we need to return two byte
// ranges in the returned disk_read_range. We don't support
// this yet so for now let's fall back to reading the entire
// partition which is wasteful but at least correct.
// This case should be replaced by correctly adding the static
// column's blocks to the return.
} else if (ck_ranges.size() == 1) {
auto data = ie.get_promoted_index_bytes();
// note we already verified above that data.size >= 16
sstables::deletion_time deltime;
deltime.local_deletion_time = consume_be<uint32_t>(data);
deltime.marked_for_delete_at = consume_be<uint64_t>(data);
uint32_t num_blocks = consume_be<uint32_t>(data);
// We do a linear search on the promoted index. If we were to
// look in the same promoted index several times it might have
// made sense to build an array of key starts so we can do a
// binary search. We could do this once we have a key cache.
auto& range_start = ck_ranges.begin()->start();
bool found_range_start = false;
uint64_t range_start_pos;
auto& range_end = ck_ranges.begin()->end();
auto cmp = clustering_key_prefix::tri_compare(*schema);
while (num_blocks--) {
uint16_t len = consume_be<uint16_t>(data);
// The promoted index contains ranges of full column
// names, which may include a clustering key and column.
// But we only need to match the clustering key, because
// we got a clustering key range to search for.
auto start_ck = get_clustering_key(*schema,
consume_bytes(data, len));
len = consume_be<uint16_t>(data);
auto end_ck = get_clustering_key(*schema,
consume_bytes(data, len));
uint64_t offset = consume_be<uint64_t>(data);
uint64_t width = consume_be<uint64_t>(data);
if (!found_range_start) {
if (!range_start || cmp(range_start->value(), end_ck) <= 0) {
range_start_pos = ie.position() + offset;
found_range_start = true;
}
}
bool found_range_end = false;
uint64_t range_end_pos;
if (range_end) {
if (cmp(range_end->value(), start_ck) < 0) {
// this block is already past the range_end
found_range_end = true;
range_end_pos = ie.position() + offset;
} else if (cmp(range_end->value(), end_ck) < 0 || num_blocks == 0) {
// range_end is in the middle of this block.
// Note the strict inequality above is important:
// if range_end==end_ck the next block may contain
// still more items matching range_end.
found_range_end = true;
range_end_pos = ie.position() + offset + width;
}
} else if (num_blocks == 0) {
// When !range_end, read until the last block.
// In this case we could have also found the end of
// the partition using the index.
found_range_end = true;
range_end_pos = ie.position() + offset + width;
}
if (found_range_end) {
if (!found_range_start) {
// return empty range
range_start_pos = range_end_pos = 0;
}
return make_ready_future<disk_read_range>(
disk_read_range(range_start_pos, range_end_pos,
key, deltime));
}
}
}
// Else, if more than one clustering-key range needs to be read,
// fall back to reading the entire partition.
// FIXME: support multiple ranges, and do not fall back to reading
// the entire partition.
} catch (...) {
// Fall back to reading whole partition
sstlog.error("Failed to parse promoted index for sstable {}, page {}, index {}: {}", this->get_filename(),
summary_idx, index_idx, std::current_exception());
}
}
// If we're still here there is no promoted index, or we had problems
// using it, so just just find the entire partition's range.
auto start = ie.position();
return this->data_end_position(summary_idx, index_idx, index_list, pc).then([start] (uint64_t end) {
return disk_read_range(start, end);
});
});
}
promoted_index promoted_index_view::parse(const schema& s) const {
bytes_view data = _bytes;
@@ -1437,6 +1268,46 @@ mutation_reader sstable::read_rows(schema_ptr schema, const io_priority_class& p
return std::make_unique<mutation_reader::impl>(shared_from_this(), schema, pc, fwd);
}
static
future<> advance_to_upper_bound(index_reader& ix, const schema& s, const query::partition_slice& slice, dht::ring_position_view key) {
auto& ranges = slice.row_ranges(s, *key.key());
if (ranges.empty()) {
return ix.advance_past(position_in_partition_view::for_static_row());
} else {
return ix.advance_past(position_in_partition_view::for_range_end(ranges[ranges.size() - 1]));
}
}
future<streamed_mutation_opt>
sstables::sstable::read_row(schema_ptr schema,
dht::ring_position_view key,
const query::partition_slice& slice,
const io_priority_class& pc,
streamed_mutation::forwarding fwd)
{
auto lh_index = get_index_reader(pc);
auto f = lh_index->advance_and_check_if_present(key);
return f.then([this, &slice, &pc, fwd, lh_index = std::move(lh_index), s = std::move(schema), key] (bool present) mutable {
if (!present) {
_filter_tracker.add_false_positive();
return make_ready_future<streamed_mutation_opt>(stdx::nullopt);
}
_filter_tracker.add_true_positive();
auto rh_index = std::make_unique<index_reader>(*lh_index);
auto f = advance_to_upper_bound(*rh_index, *_schema, slice, key);
return f.then([this, &slice, &pc, fwd, lh_index = std::move(lh_index), rh_index = std::move(rh_index), s = std::move(s)] () mutable {
auto consumer = mp_row_consumer(s, slice, pc, fwd);
auto ds = make_lw_shared<sstable_data_source>(sstable_data_source::single_partition_tag(), std::move(s),
shared_from_this(), std::move(consumer), std::move(lh_index), std::move(rh_index));
ds->_will_likely_slice = sstable_data_source::will_likely_slice(slice);
ds->_index_in_current_partition = true;
return ds->read_partition().finally([ds]{});
});
});
}
mutation_reader
sstable::read_range_rows(schema_ptr schema,
const dht::partition_range& range,

View File

@@ -272,7 +272,14 @@ public:
// returned in the result.
future<streamed_mutation_opt> read_row(
schema_ptr schema,
const key& k,
dht::ring_position_view key,
const query::partition_slice& slice = query::full_slice,
const io_priority_class& pc = default_priority_class(),
streamed_mutation::forwarding fwd = streamed_mutation::forwarding::no);
future<streamed_mutation_opt> read_row(
schema_ptr schema,
const sstables::key& key,
const query::partition_slice& slice = query::full_slice,
const io_priority_class& pc = default_priority_class(),
streamed_mutation::forwarding fwd = streamed_mutation::forwarding::no);