Merge "size-based sampling for sstable summary" from Raphael

"Fixes #1842."

* 'size_based_sampling_v3' of github.com:raphaelsc/scylla:
  tests: test summary entry spanning more keys than min interval
  db/config: introduce sstable_summary_ratio option
  sstables: introduce size-based sampling for sstable summary
  sstables: make components_writer::offset const qualified and uint64_t
  sstables: make writer::offset const qualified and uint64_t
This commit is contained in:
Avi Kivity
2017-08-11 18:41:45 +03:00
7 changed files with 113 additions and 36 deletions

View File

@@ -775,6 +775,8 @@ public:
val(abort_on_lsa_bad_alloc, bool, false, Used, "Abort when allocation in LSA region fails") \
val(murmur3_partitioner_ignore_msb_bits, unsigned, 0, Used, "Number of most siginificant token bits to ignore in murmur3 partitioner; increase for very large clusters") \
val(virtual_dirty_soft_limit, double, 0.6, Used, "Soft limit of virtual dirty memory expressed as a portion of the hard limit") \
val(sstable_summary_ratio, double, 0.0005, Used, "Enforces that 1 byte of summary is written for every N (2000 by default) " \
"bytes written to data file. Value must be between 0 and 1.") \
/* done! */
#define _make_value_member(name, type, deflt, status, desc, ...) \

View File

@@ -68,6 +68,8 @@ namespace sstables {
logging::logger sstlog("sstable");
static const db::config& get_config();
seastar::shared_ptr<write_monitor> default_write_monitor() {
static thread_local seastar::shared_ptr<write_monitor> monitor = seastar::make_shared<noop_write_monitor>();
return monitor;
@@ -1764,7 +1766,6 @@ static void prepare_summary(summary& s, uint64_t expected_partition_count, uint3
throw malformed_sstable_exception("Current sampling level (" + to_sstring(downsampling::BASE_SAMPLING_LEVEL) + ") not enough to generate summary.");
}
s.keys_written = 0;
s.header.memory_size = 0;
}
@@ -1802,13 +1803,6 @@ static void prepare_compression(compression& c, const schema& schema) {
c.init_full_checksum();
}
static void maybe_add_summary_entry(summary& s, const dht::token& token, bytes_view key, uint64_t offset) {
// Maybe add summary entry into in-memory representation of summary file.
if ((s.keys_written++ % s.header.min_index_interval) == 0) {
s.entries.push_back({ token, bytes(key.data(), key.size()), offset });
}
}
static
void
populate_statistics_offsets(statistics& s) {
@@ -1873,8 +1867,30 @@ static void seal_statistics(statistics& s, metadata_collector& collector,
populate_statistics_offsets(s);
}
void components_writer::maybe_add_summary_entry(summary& s, const dht::token& token, bytes_view key, uint64_t data_offset,
uint64_t index_offset, uint64_t& next_data_offset_to_write_summary, size_t summary_byte_cost) {
static constexpr size_t target_index_interval_size = 65536;
auto index_size_for_current_entry = index_offset - (s.entries.size() ? s.entries.back().position : 0);
// generates a summary entry after 64 KB of index data *iff* we're writing 2000 (default value) to data
// for every 1 byte written to summary. 64 KB condition will prevent useless generation of summary entry
// for small key with lots of data. Both conditions will prevent summary from growing large for large
// keys with little data.
if (!next_data_offset_to_write_summary || (data_offset >= next_data_offset_to_write_summary &&
index_size_for_current_entry >= target_index_interval_size)) {
next_data_offset_to_write_summary += summary_byte_cost * key.size();
s.entries.push_back({ token, bytes(key.data(), key.size()), index_offset });
}
}
void components_writer::maybe_add_summary_entry(const dht::token& token, bytes_view key) {
return maybe_add_summary_entry(_sst._components->summary, token, key, get_offset(),
_index.offset(), _next_data_offset_to_write_summary, _summary_byte_cost);
}
// Returns offset into data component.
size_t components_writer::get_offset() {
uint64_t components_writer::get_offset() const {
if (_sst.has_component(sstable::component_type::CompressionInfo)) {
// Variable returned by compressed_file_length() is constantly updated by compressed output stream.
return _sst._components->compression.compressed_file_length();
@@ -1903,6 +1919,13 @@ static const db::config& get_config() {
}
}
// Returns the cost for writing a byte to summary such that the ratio of summary
// to data will be 1 to cost by the time sstable is sealed.
static size_t summary_byte_cost() {
auto summary_ratio = get_config().sstable_summary_ratio();
return summary_ratio ? (1 / summary_ratio) : components_writer::default_summary_byte_cost;
}
components_writer::components_writer(sstable& sst, const schema& s, file_writer& out,
uint64_t estimated_partitions,
const sstable_writer_config& cfg,
@@ -1914,6 +1937,7 @@ components_writer::components_writer(sstable& sst, const schema& s, file_writer&
, _index_needs_close(true)
, _max_sstable_size(cfg.max_sstable_size)
, _tombstone_written(false)
, _summary_byte_cost(summary_byte_cost())
{
_sst._components->filter = utils::i_filter::get_filter(estimated_partitions, _schema.bloom_filter_fp_chance());
_sst._pi_write.desired_block_size = cfg.promoted_index_block_size.value_or(get_config().column_index_size_in_kb() * 1024);
@@ -1929,7 +1953,7 @@ void components_writer::consume_new_partition(const dht::decorated_key& dk) {
_partition_key = key::from_partition_key(_schema, dk.key());
maybe_add_summary_entry(_sst._components->summary, dk.token(), bytes_view(*_partition_key), _index.offset());
maybe_add_summary_entry(dk.token(), bytes_view(*_partition_key));
_sst._components->filter->add(bytes_view(*_partition_key));
_sst._collector.add_key(bytes_view(*_partition_key));
@@ -2210,16 +2234,20 @@ future<> sstable::generate_summary(const io_priority_class& pc) {
sstlog.info("Summary file {} not found. Generating Summary...", filename(sstable::component_type::Summary));
class summary_generator {
summary& _summary;
uint64_t _data_size;
size_t _summary_byte_cost;
uint64_t _next_data_offset_to_write_summary = 0;
public:
std::experimental::optional<key> first_key, last_key;
summary_generator(summary& s) : _summary(s) {}
summary_generator(summary& s, uint64_t data_size) : _summary(s), _data_size(data_size), _summary_byte_cost(summary_byte_cost()) {}
bool should_continue() {
return true;
}
void consume_entry(index_entry&& ie, uint64_t offset) {
void consume_entry(index_entry&& ie, uint64_t index_offset) {
auto token = dht::global_partitioner().get_token(ie.get_key());
maybe_add_summary_entry(_summary, token, ie.get_key_bytes(), offset);
components_writer::maybe_add_summary_entry(_summary, token, ie.get_key_bytes(), _data_size, index_offset,
_next_data_offset_to_write_summary, _summary_byte_cost);
if (!first_key) {
first_key = key(to_bytes(ie.get_key_bytes()));
} else {
@@ -2230,17 +2258,20 @@ future<> sstable::generate_summary(const io_priority_class& pc) {
return open_checked_file_dma(_read_error_handler, filename(component_type::Index), open_flags::ro).then([this, &pc] (file index_file) {
return do_with(std::move(index_file), [this, &pc] (file index_file) {
return index_file.size().then([this, &pc, index_file] (auto size) {
return seastar::when_all_succeed(
io_check([&] { return engine().file_size(this->filename(sstable::component_type::Data)); }),
index_file.size()).then([this, &pc, index_file] (auto data_size, auto index_size) {
// an upper bound. Surely to be less than this.
auto estimated_partitions = size / sizeof(uint64_t);
auto estimated_partitions = index_size / sizeof(uint64_t);
prepare_summary(_components->summary, estimated_partitions, _schema->min_index_interval());
file_input_stream_options options;
options.buffer_size = sstable_buffer_size;
options.io_priority_class = pc;
auto stream = make_file_input_stream(index_file, 0, size, std::move(options));
return do_with(summary_generator(_components->summary), [this, &pc, stream = std::move(stream), size] (summary_generator& s) mutable {
auto ctx = make_lw_shared<index_consume_entry_context<summary_generator>>(s, std::move(stream), 0, size);
auto stream = make_file_input_stream(index_file, 0, index_size, std::move(options));
return do_with(summary_generator(_components->summary, data_size),
[this, &pc, stream = std::move(stream), index_size] (summary_generator& s) mutable {
auto ctx = make_lw_shared<index_consume_entry_context<summary_generator>>(s, std::move(stream), 0, index_size);
return ctx->consume_input(*ctx).finally([ctx] {
return ctx->close();
}).then([this, ctx, &s] {

View File

@@ -797,8 +797,12 @@ class components_writer {
// Remember first and last keys, which we need for the summary file.
stdx::optional<key> _first_key, _last_key;
stdx::optional<key> _partition_key;
uint64_t _next_data_offset_to_write_summary = 0;
// Enforces ratio of summary to data of 1 to N.
size_t _summary_byte_cost = default_summary_byte_cost;
private:
size_t get_offset();
void maybe_add_summary_entry(const dht::token& token, bytes_view key);
uint64_t get_offset() const;
file_writer index_file_writer(sstable& sst, const io_priority_class& pc);
void ensure_tombstone_is_written() {
if (!_tombstone_written) {
@@ -810,7 +814,8 @@ public:
~components_writer();
components_writer(components_writer&& o) : _sst(o._sst), _schema(o._schema), _out(o._out), _index(std::move(o._index)),
_index_needs_close(o._index_needs_close), _max_sstable_size(o._max_sstable_size), _tombstone_written(o._tombstone_written),
_first_key(std::move(o._first_key)), _last_key(std::move(o._last_key)), _partition_key(std::move(o._partition_key)) {
_first_key(std::move(o._first_key)), _last_key(std::move(o._last_key)), _partition_key(std::move(o._partition_key)),
_next_data_offset_to_write_summary(o._next_data_offset_to_write_summary), _summary_byte_cost(o._summary_byte_cost) {
o._index_needs_close = false;
}
@@ -821,6 +826,10 @@ public:
stop_iteration consume(range_tombstone&& rt);
stop_iteration consume_end_of_partition();
void consume_end_of_stream();
static constexpr size_t default_summary_byte_cost = 2000;
static void maybe_add_summary_entry(summary& s, const dht::token& token, bytes_view key, uint64_t data_offset,
uint64_t index_offset, uint64_t& next_data_offset_to_write_summary, size_t summary_byte_cost);
};
class sstable_writer {

View File

@@ -219,10 +219,6 @@ struct summary_ka {
disk_string<uint32_t> first_key;
disk_string<uint32_t> last_key;
// Used to determine when a summary entry should be added based on min_index_interval.
// NOTE: keys_written isn't part of on-disk format of summary.
size_t keys_written;
// NOTE4: There is a structure written by Cassandra into the end of the Summary
// file, after the field last_key, that we haven't understand yet, but we know
// that its content isn't related to the summary itself.

View File

@@ -31,7 +31,7 @@ namespace sstables {
class file_writer {
output_stream<char> _out;
size_t _offset = 0;
uint64_t _offset = 0;
public:
file_writer(file f, file_output_stream_options options)
: _out(make_file_output_stream(std::move(f), std::move(options))) {}
@@ -56,7 +56,7 @@ public:
future<> close() {
return _out.close();
}
size_t offset() {
uint64_t offset() const {
return _offset;
}
};

View File

@@ -582,8 +582,7 @@ SEASTAR_TEST_CASE(datafile_generation_08) {
const column_definition& r1_col = *s->get_column_definition("r1");
// Create 150 partitions so that summary file store 2 entries, assuming min index
// interval is 128.
// TODO: generate sstable which will have 2 samples with size-based sampling.
for (int32_t i = 0; i < 150; i++) {
auto key = partition_key::from_exploded(*s, {int32_type->decompose(i)});
auto c_key = clustering_key::from_exploded(*s, {to_bytes("abc")});
@@ -605,13 +604,13 @@ SEASTAR_TEST_CASE(datafile_generation_08) {
auto buf = bufptr.get();
size_t offset = 0;
std::vector<uint8_t> header = { /* min_index_interval */ 0, 0, 0, 0x80, /* size */ 0, 0, 0, 2,
/* memory_size */ 0, 0, 0, 0, 0, 0, 0, 0x20, /* sampling_level */ 0, 0, 0, 0x80,
/* size_at_full_sampling */ 0, 0, 0, 2 };
std::vector<uint8_t> header = { /* min_index_interval */ 0, 0, 0, 0x80, /* size */ 0, 0, 0, 1,
/* memory_size */ 0, 0, 0, 0, 0, 0, 0, 0x10, /* sampling_level */ 0, 0, 0, 0x80,
/* size_at_full_sampling */ 0, 0, 0, 1 };
BOOST_REQUIRE(::memcmp(header.data(), &buf[offset], header.size()) == 0);
offset += header.size();
std::vector<uint8_t> positions = { 0x8, 0, 0, 0, 0x14, 0, 0, 0 };
std::vector<uint8_t> positions = { 0x4, 0, 0, 0 };
BOOST_REQUIRE(::memcmp(positions.data(), &buf[offset], positions.size()) == 0);
offset += positions.size();
@@ -619,10 +618,6 @@ SEASTAR_TEST_CASE(datafile_generation_08) {
BOOST_REQUIRE(::memcmp(first_entry.data(), &buf[offset], first_entry.size()) == 0);
offset += first_entry.size();
std::vector<uint8_t> second_entry = { /* key */ 0, 0, 0, 0x65, /* position */ 0, 0x9, 0, 0, 0, 0, 0, 0 };
BOOST_REQUIRE(::memcmp(second_entry.data(), &buf[offset], second_entry.size()) == 0);
offset += second_entry.size();
std::vector<uint8_t> first_key = { 0, 0, 0, 0x4, 0, 0, 0, 0x17 };
BOOST_REQUIRE(::memcmp(first_key.data(), &buf[offset], first_key.size()) == 0);
offset += first_key.size();
@@ -4057,3 +4052,43 @@ SEASTAR_TEST_CASE(sstable_expired_data_ratio) {
}
});
}
SEASTAR_TEST_CASE(test_summary_entry_spanning_more_keys_than_min_interval) {
return seastar::async([] {
auto s = make_lw_shared(schema({}, some_keyspace, some_column_family,
{{"p1", int32_type}}, {{"c1", utf8_type}}, {{"r1", int32_type}}, {}, utf8_type));
const column_definition& r1_col = *s->get_column_definition("r1");
std::vector<mutation> mutations;
auto keys_written = 0;
for (auto i = 0; i < s->min_index_interval()*1.5; i++) {
auto key = partition_key::from_exploded(*s, {int32_type->decompose(i)});
auto c_key = clustering_key::from_exploded(*s, {to_bytes("abc")});
mutation m(key, s);
m.set_clustered_cell(c_key, r1_col, make_atomic_cell(int32_type->decompose(1)));
mutations.push_back(std::move(m));
keys_written++;
}
auto tmp = make_lw_shared<tmpdir>();
auto sst_gen = [s, tmp, gen = make_lw_shared<unsigned>(1)] () mutable {
return make_lw_shared<sstable>(s, tmp->path, (*gen)++, la, big);
};
auto sst = make_sstable_containing(sst_gen, mutations);
sst = reusable_sst(s, tmp->path, sst->generation()).get0();
summary& sum = sstables::test(sst).get_summary();
BOOST_REQUIRE(sum.entries.size() == 1);
std::set<mutation, mutation_decorated_key_less_comparator> merged;
merged.insert(mutations.begin(), mutations.end());
auto rd = assert_that(sst->as_mutation_source()(s));
auto keys_read = 0;
for (auto&& m : merged) {
keys_read++;
rd.produces(m);
}
rd.produces_end_of_stream();
BOOST_REQUIRE(keys_read == keys_written);
});
}

View File

@@ -196,9 +196,13 @@ SEASTAR_TEST_CASE(missing_summary_query_negative_fail) {
return summary_query_fail<-uint64_t(2), 0, 5>(uncompressed_schema(), "tests/sstables/uncompressed", 2);
}
// TODO: only one interval is generated with size-based sampling. Test it with a sstable that will actually result
// in two intervals.
#if 0
SEASTAR_TEST_CASE(missing_summary_interval_1_query_ok) {
return summary_query<1, 19, 6>(uncompressed_schema(1), "tests/sstables/uncompressed", 2);
}
#endif
SEASTAR_TEST_CASE(missing_summary_first_last_sane) {
return reusable_sst(uncompressed_schema(), "tests/sstables/uncompressed", 2).then([] (sstable_ptr ptr) {