sstables: introduce size-based sampling for sstable summary

Currently, a summary entry is added after min_index_interval index
entries were written. Not taking into account size of index entries
becomes a problem with large partitions which may create big index
entries due to promoted indexes. Read performance is affected as a
consequence because index entries spanned by summary are all read
from disk to serve request.

What we wanna do is to also add a summary entry after index reaches
a boundary. To deal with oversampling, we want to write 1 byte to
summary for every 2000 bytes written to data file (this will be
eventually made into an option in the config file).
Both conditions must be met to avoid under or oversampling.
That way, the amount of data needed from index file to satify the
request is drastically reduced.

Fixes #1842.

Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
This commit is contained in:
Raphael S. Carvalho
2017-08-10 02:16:20 -03:00
parent da7489720b
commit 8726ee937d
5 changed files with 54 additions and 32 deletions

View File

@@ -1764,7 +1764,6 @@ static void prepare_summary(summary& s, uint64_t expected_partition_count, uint3
throw malformed_sstable_exception("Current sampling level (" + to_sstring(downsampling::BASE_SAMPLING_LEVEL) + ") not enough to generate summary.");
}
s.keys_written = 0;
s.header.memory_size = 0;
}
@@ -1802,13 +1801,6 @@ static void prepare_compression(compression& c, const schema& schema) {
c.init_full_checksum();
}
static void maybe_add_summary_entry(summary& s, const dht::token& token, bytes_view key, uint64_t offset) {
// Maybe add summary entry into in-memory representation of summary file.
if ((s.keys_written++ % s.header.min_index_interval) == 0) {
s.entries.push_back({ token, bytes(key.data(), key.size()), offset });
}
}
static
void
populate_statistics_offsets(statistics& s) {
@@ -1873,6 +1865,29 @@ static void seal_statistics(statistics& s, metadata_collector& collector,
populate_statistics_offsets(s);
}
void components_writer::maybe_add_summary_entry(summary& s, const dht::token& token, bytes_view key, uint64_t data_offset,
uint64_t index_offset, uint64_t& next_data_offset_to_write_summary) {
static constexpr size_t target_index_interval_size = 65536;
static constexpr size_t summary_byte_cost = 2000; // TODO: use configuration file for it.
auto index_size_for_current_entry = index_offset - (s.entries.size() ? s.entries.back().position : 0);
// generates a summary entry after 64 KB of index data *iff* we're writing 2000 (default value) to data
// for every 1 byte written to summary. 64 KB condition will prevent useless generation of summary entry
// for small key with lots of data. Both conditions will prevent summary from growing large for large
// keys with little data.
if (!next_data_offset_to_write_summary || (data_offset >= next_data_offset_to_write_summary &&
index_size_for_current_entry >= target_index_interval_size)) {
next_data_offset_to_write_summary += summary_byte_cost * key.size();
s.entries.push_back({ token, bytes(key.data(), key.size()), index_offset });
}
}
void components_writer::maybe_add_summary_entry(const dht::token& token, bytes_view key) {
return maybe_add_summary_entry(_sst._components->summary, token, key, get_offset(),
_index.offset(), _next_data_offset_to_write_summary);
}
// Returns offset into data component.
uint64_t components_writer::get_offset() const {
if (_sst.has_component(sstable::component_type::CompressionInfo)) {
@@ -1929,7 +1944,7 @@ void components_writer::consume_new_partition(const dht::decorated_key& dk) {
_partition_key = key::from_partition_key(_schema, dk.key());
maybe_add_summary_entry(_sst._components->summary, dk.token(), bytes_view(*_partition_key), _index.offset());
maybe_add_summary_entry(dk.token(), bytes_view(*_partition_key));
_sst._components->filter->add(bytes_view(*_partition_key));
_sst._collector.add_key(bytes_view(*_partition_key));
@@ -2210,16 +2225,19 @@ future<> sstable::generate_summary(const io_priority_class& pc) {
sstlog.info("Summary file {} not found. Generating Summary...", filename(sstable::component_type::Summary));
class summary_generator {
summary& _summary;
uint64_t _data_size;
uint64_t _next_data_offset_to_write_summary = 0;
public:
std::experimental::optional<key> first_key, last_key;
summary_generator(summary& s) : _summary(s) {}
summary_generator(summary& s, uint64_t data_size) : _summary(s), _data_size(data_size) {}
bool should_continue() {
return true;
}
void consume_entry(index_entry&& ie, uint64_t offset) {
void consume_entry(index_entry&& ie, uint64_t index_offset) {
auto token = dht::global_partitioner().get_token(ie.get_key());
maybe_add_summary_entry(_summary, token, ie.get_key_bytes(), offset);
components_writer::maybe_add_summary_entry(_summary, token, ie.get_key_bytes(), _data_size, index_offset,
_next_data_offset_to_write_summary);
if (!first_key) {
first_key = key(to_bytes(ie.get_key_bytes()));
} else {
@@ -2230,17 +2248,20 @@ future<> sstable::generate_summary(const io_priority_class& pc) {
return open_checked_file_dma(_read_error_handler, filename(component_type::Index), open_flags::ro).then([this, &pc] (file index_file) {
return do_with(std::move(index_file), [this, &pc] (file index_file) {
return index_file.size().then([this, &pc, index_file] (auto size) {
return seastar::when_all_succeed(
io_check([&] { return engine().file_size(this->filename(sstable::component_type::Data)); }),
index_file.size()).then([this, &pc, index_file] (auto data_size, auto index_size) {
// an upper bound. Surely to be less than this.
auto estimated_partitions = size / sizeof(uint64_t);
auto estimated_partitions = index_size / sizeof(uint64_t);
prepare_summary(_components->summary, estimated_partitions, _schema->min_index_interval());
file_input_stream_options options;
options.buffer_size = sstable_buffer_size;
options.io_priority_class = pc;
auto stream = make_file_input_stream(index_file, 0, size, std::move(options));
return do_with(summary_generator(_components->summary), [this, &pc, stream = std::move(stream), size] (summary_generator& s) mutable {
auto ctx = make_lw_shared<index_consume_entry_context<summary_generator>>(s, std::move(stream), 0, size);
auto stream = make_file_input_stream(index_file, 0, index_size, std::move(options));
return do_with(summary_generator(_components->summary, data_size),
[this, &pc, stream = std::move(stream), index_size] (summary_generator& s) mutable {
auto ctx = make_lw_shared<index_consume_entry_context<summary_generator>>(s, std::move(stream), 0, index_size);
return ctx->consume_input(*ctx).finally([ctx] {
return ctx->close();
}).then([this, ctx, &s] {

View File

@@ -797,7 +797,9 @@ class components_writer {
// Remember first and last keys, which we need for the summary file.
stdx::optional<key> _first_key, _last_key;
stdx::optional<key> _partition_key;
uint64_t _next_data_offset_to_write_summary = 0;
private:
void maybe_add_summary_entry(const dht::token& token, bytes_view key);
uint64_t get_offset() const;
file_writer index_file_writer(sstable& sst, const io_priority_class& pc);
void ensure_tombstone_is_written() {
@@ -810,7 +812,8 @@ public:
~components_writer();
components_writer(components_writer&& o) : _sst(o._sst), _schema(o._schema), _out(o._out), _index(std::move(o._index)),
_index_needs_close(o._index_needs_close), _max_sstable_size(o._max_sstable_size), _tombstone_written(o._tombstone_written),
_first_key(std::move(o._first_key)), _last_key(std::move(o._last_key)), _partition_key(std::move(o._partition_key)) {
_first_key(std::move(o._first_key)), _last_key(std::move(o._last_key)), _partition_key(std::move(o._partition_key)),
_next_data_offset_to_write_summary(o._next_data_offset_to_write_summary) {
o._index_needs_close = false;
}
@@ -821,6 +824,9 @@ public:
stop_iteration consume(range_tombstone&& rt);
stop_iteration consume_end_of_partition();
void consume_end_of_stream();
static void maybe_add_summary_entry(summary& s, const dht::token& token, bytes_view key, uint64_t data_offset,
uint64_t index_offset, uint64_t& next_data_offset_to_write_summary);
};
class sstable_writer {

View File

@@ -219,10 +219,6 @@ struct summary_ka {
disk_string<uint32_t> first_key;
disk_string<uint32_t> last_key;
// Used to determine when a summary entry should be added based on min_index_interval.
// NOTE: keys_written isn't part of on-disk format of summary.
size_t keys_written;
// NOTE4: There is a structure written by Cassandra into the end of the Summary
// file, after the field last_key, that we haven't understand yet, but we know
// that its content isn't related to the summary itself.

View File

@@ -582,8 +582,7 @@ SEASTAR_TEST_CASE(datafile_generation_08) {
const column_definition& r1_col = *s->get_column_definition("r1");
// Create 150 partitions so that summary file store 2 entries, assuming min index
// interval is 128.
// TODO: generate sstable which will have 2 samples with size-based sampling.
for (int32_t i = 0; i < 150; i++) {
auto key = partition_key::from_exploded(*s, {int32_type->decompose(i)});
auto c_key = clustering_key::from_exploded(*s, {to_bytes("abc")});
@@ -605,13 +604,13 @@ SEASTAR_TEST_CASE(datafile_generation_08) {
auto buf = bufptr.get();
size_t offset = 0;
std::vector<uint8_t> header = { /* min_index_interval */ 0, 0, 0, 0x80, /* size */ 0, 0, 0, 2,
/* memory_size */ 0, 0, 0, 0, 0, 0, 0, 0x20, /* sampling_level */ 0, 0, 0, 0x80,
/* size_at_full_sampling */ 0, 0, 0, 2 };
std::vector<uint8_t> header = { /* min_index_interval */ 0, 0, 0, 0x80, /* size */ 0, 0, 0, 1,
/* memory_size */ 0, 0, 0, 0, 0, 0, 0, 0x10, /* sampling_level */ 0, 0, 0, 0x80,
/* size_at_full_sampling */ 0, 0, 0, 1 };
BOOST_REQUIRE(::memcmp(header.data(), &buf[offset], header.size()) == 0);
offset += header.size();
std::vector<uint8_t> positions = { 0x8, 0, 0, 0, 0x14, 0, 0, 0 };
std::vector<uint8_t> positions = { 0x4, 0, 0, 0 };
BOOST_REQUIRE(::memcmp(positions.data(), &buf[offset], positions.size()) == 0);
offset += positions.size();
@@ -619,10 +618,6 @@ SEASTAR_TEST_CASE(datafile_generation_08) {
BOOST_REQUIRE(::memcmp(first_entry.data(), &buf[offset], first_entry.size()) == 0);
offset += first_entry.size();
std::vector<uint8_t> second_entry = { /* key */ 0, 0, 0, 0x65, /* position */ 0, 0x9, 0, 0, 0, 0, 0, 0 };
BOOST_REQUIRE(::memcmp(second_entry.data(), &buf[offset], second_entry.size()) == 0);
offset += second_entry.size();
std::vector<uint8_t> first_key = { 0, 0, 0, 0x4, 0, 0, 0, 0x17 };
BOOST_REQUIRE(::memcmp(first_key.data(), &buf[offset], first_key.size()) == 0);
offset += first_key.size();

View File

@@ -196,9 +196,13 @@ SEASTAR_TEST_CASE(missing_summary_query_negative_fail) {
return summary_query_fail<-uint64_t(2), 0, 5>(uncompressed_schema(), "tests/sstables/uncompressed", 2);
}
// TODO: only one interval is generated with size-based sampling. Test it with a sstable that will actually result
// in two intervals.
#if 0
SEASTAR_TEST_CASE(missing_summary_interval_1_query_ok) {
return summary_query<1, 19, 6>(uncompressed_schema(1), "tests/sstables/uncompressed", 2);
}
#endif
SEASTAR_TEST_CASE(missing_summary_first_last_sane) {
return reusable_sst(uncompressed_schema(), "tests/sstables/uncompressed", 2).then([] (sstable_ptr ptr) {