From 881c479be869ff0e4a113f99cfa5e50556edb17f Mon Sep 17 00:00:00 2001 From: "Raphael S. Carvalho" Date: Thu, 10 Aug 2017 00:37:26 -0300 Subject: [PATCH 1/5] sstables: make writer::offset const qualified and uint64_t Signed-off-by: Raphael S. Carvalho --- sstables/writer.hh | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sstables/writer.hh b/sstables/writer.hh index fe89f4a30f..8165f0710c 100644 --- a/sstables/writer.hh +++ b/sstables/writer.hh @@ -31,7 +31,7 @@ namespace sstables { class file_writer { output_stream _out; - size_t _offset = 0; + uint64_t _offset = 0; public: file_writer(file f, file_output_stream_options options) : _out(make_file_output_stream(std::move(f), std::move(options))) {} @@ -56,7 +56,7 @@ public: future<> close() { return _out.close(); } - size_t offset() { + uint64_t offset() const { return _offset; } }; From da7489720b5d6c8be7e2262569d9b4483c944d05 Mon Sep 17 00:00:00 2001 From: "Raphael S. Carvalho" Date: Thu, 10 Aug 2017 21:48:11 -0300 Subject: [PATCH 2/5] sstables: make components_writer::offset const qualified and uint64_t Signed-off-by: Raphael S. Carvalho --- sstables/sstables.cc | 2 +- sstables/sstables.hh | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/sstables/sstables.cc b/sstables/sstables.cc index 66ce6a8376..7ef5d248b7 100644 --- a/sstables/sstables.cc +++ b/sstables/sstables.cc @@ -1874,7 +1874,7 @@ static void seal_statistics(statistics& s, metadata_collector& collector, } // Returns offset into data component. -size_t components_writer::get_offset() { +uint64_t components_writer::get_offset() const { if (_sst.has_component(sstable::component_type::CompressionInfo)) { // Variable returned by compressed_file_length() is constantly updated by compressed output stream. return _sst._components->compression.compressed_file_length(); diff --git a/sstables/sstables.hh b/sstables/sstables.hh index 993cd5d92e..846817fc54 100644 --- a/sstables/sstables.hh +++ b/sstables/sstables.hh @@ -798,7 +798,7 @@ class components_writer { stdx::optional _first_key, _last_key; stdx::optional _partition_key; private: - size_t get_offset(); + uint64_t get_offset() const; file_writer index_file_writer(sstable& sst, const io_priority_class& pc); void ensure_tombstone_is_written() { if (!_tombstone_written) { From 8726ee937da24a1d4a20bf73d98cd434b237027e Mon Sep 17 00:00:00 2001 From: "Raphael S. Carvalho" Date: Thu, 10 Aug 2017 02:16:20 -0300 Subject: [PATCH 3/5] sstables: introduce size-based sampling for sstable summary Currently, a summary entry is added after min_index_interval index entries were written. Not taking into account size of index entries becomes a problem with large partitions which may create big index entries due to promoted indexes. Read performance is affected as a consequence because index entries spanned by summary are all read from disk to serve request. What we wanna do is to also add a summary entry after index reaches a boundary. To deal with oversampling, we want to write 1 byte to summary for every 2000 bytes written to data file (this will be eventually made into an option in the config file). Both conditions must be met to avoid under or oversampling. That way, the amount of data needed from index file to satify the request is drastically reduced. Fixes #1842. Signed-off-by: Raphael S. Carvalho --- sstables/sstables.cc | 55 +++++++++++++++++++++++----------- sstables/sstables.hh | 8 ++++- sstables/types.hh | 4 --- tests/sstable_datafile_test.cc | 15 ++++------ tests/sstable_test.cc | 4 +++ 5 files changed, 54 insertions(+), 32 deletions(-) diff --git a/sstables/sstables.cc b/sstables/sstables.cc index 7ef5d248b7..8db799db2c 100644 --- a/sstables/sstables.cc +++ b/sstables/sstables.cc @@ -1764,7 +1764,6 @@ static void prepare_summary(summary& s, uint64_t expected_partition_count, uint3 throw malformed_sstable_exception("Current sampling level (" + to_sstring(downsampling::BASE_SAMPLING_LEVEL) + ") not enough to generate summary."); } - s.keys_written = 0; s.header.memory_size = 0; } @@ -1802,13 +1801,6 @@ static void prepare_compression(compression& c, const schema& schema) { c.init_full_checksum(); } -static void maybe_add_summary_entry(summary& s, const dht::token& token, bytes_view key, uint64_t offset) { - // Maybe add summary entry into in-memory representation of summary file. - if ((s.keys_written++ % s.header.min_index_interval) == 0) { - s.entries.push_back({ token, bytes(key.data(), key.size()), offset }); - } -} - static void populate_statistics_offsets(statistics& s) { @@ -1873,6 +1865,29 @@ static void seal_statistics(statistics& s, metadata_collector& collector, populate_statistics_offsets(s); } +void components_writer::maybe_add_summary_entry(summary& s, const dht::token& token, bytes_view key, uint64_t data_offset, + uint64_t index_offset, uint64_t& next_data_offset_to_write_summary) { + static constexpr size_t target_index_interval_size = 65536; + static constexpr size_t summary_byte_cost = 2000; // TODO: use configuration file for it. + + auto index_size_for_current_entry = index_offset - (s.entries.size() ? s.entries.back().position : 0); + + // generates a summary entry after 64 KB of index data *iff* we're writing 2000 (default value) to data + // for every 1 byte written to summary. 64 KB condition will prevent useless generation of summary entry + // for small key with lots of data. Both conditions will prevent summary from growing large for large + // keys with little data. + if (!next_data_offset_to_write_summary || (data_offset >= next_data_offset_to_write_summary && + index_size_for_current_entry >= target_index_interval_size)) { + next_data_offset_to_write_summary += summary_byte_cost * key.size(); + s.entries.push_back({ token, bytes(key.data(), key.size()), index_offset }); + } +} + +void components_writer::maybe_add_summary_entry(const dht::token& token, bytes_view key) { + return maybe_add_summary_entry(_sst._components->summary, token, key, get_offset(), + _index.offset(), _next_data_offset_to_write_summary); +} + // Returns offset into data component. uint64_t components_writer::get_offset() const { if (_sst.has_component(sstable::component_type::CompressionInfo)) { @@ -1929,7 +1944,7 @@ void components_writer::consume_new_partition(const dht::decorated_key& dk) { _partition_key = key::from_partition_key(_schema, dk.key()); - maybe_add_summary_entry(_sst._components->summary, dk.token(), bytes_view(*_partition_key), _index.offset()); + maybe_add_summary_entry(dk.token(), bytes_view(*_partition_key)); _sst._components->filter->add(bytes_view(*_partition_key)); _sst._collector.add_key(bytes_view(*_partition_key)); @@ -2210,16 +2225,19 @@ future<> sstable::generate_summary(const io_priority_class& pc) { sstlog.info("Summary file {} not found. Generating Summary...", filename(sstable::component_type::Summary)); class summary_generator { summary& _summary; + uint64_t _data_size; + uint64_t _next_data_offset_to_write_summary = 0; public: std::experimental::optional first_key, last_key; - summary_generator(summary& s) : _summary(s) {} + summary_generator(summary& s, uint64_t data_size) : _summary(s), _data_size(data_size) {} bool should_continue() { return true; } - void consume_entry(index_entry&& ie, uint64_t offset) { + void consume_entry(index_entry&& ie, uint64_t index_offset) { auto token = dht::global_partitioner().get_token(ie.get_key()); - maybe_add_summary_entry(_summary, token, ie.get_key_bytes(), offset); + components_writer::maybe_add_summary_entry(_summary, token, ie.get_key_bytes(), _data_size, index_offset, + _next_data_offset_to_write_summary); if (!first_key) { first_key = key(to_bytes(ie.get_key_bytes())); } else { @@ -2230,17 +2248,20 @@ future<> sstable::generate_summary(const io_priority_class& pc) { return open_checked_file_dma(_read_error_handler, filename(component_type::Index), open_flags::ro).then([this, &pc] (file index_file) { return do_with(std::move(index_file), [this, &pc] (file index_file) { - return index_file.size().then([this, &pc, index_file] (auto size) { + return seastar::when_all_succeed( + io_check([&] { return engine().file_size(this->filename(sstable::component_type::Data)); }), + index_file.size()).then([this, &pc, index_file] (auto data_size, auto index_size) { // an upper bound. Surely to be less than this. - auto estimated_partitions = size / sizeof(uint64_t); + auto estimated_partitions = index_size / sizeof(uint64_t); prepare_summary(_components->summary, estimated_partitions, _schema->min_index_interval()); file_input_stream_options options; options.buffer_size = sstable_buffer_size; options.io_priority_class = pc; - auto stream = make_file_input_stream(index_file, 0, size, std::move(options)); - return do_with(summary_generator(_components->summary), [this, &pc, stream = std::move(stream), size] (summary_generator& s) mutable { - auto ctx = make_lw_shared>(s, std::move(stream), 0, size); + auto stream = make_file_input_stream(index_file, 0, index_size, std::move(options)); + return do_with(summary_generator(_components->summary, data_size), + [this, &pc, stream = std::move(stream), index_size] (summary_generator& s) mutable { + auto ctx = make_lw_shared>(s, std::move(stream), 0, index_size); return ctx->consume_input(*ctx).finally([ctx] { return ctx->close(); }).then([this, ctx, &s] { diff --git a/sstables/sstables.hh b/sstables/sstables.hh index 846817fc54..7ebf543163 100644 --- a/sstables/sstables.hh +++ b/sstables/sstables.hh @@ -797,7 +797,9 @@ class components_writer { // Remember first and last keys, which we need for the summary file. stdx::optional _first_key, _last_key; stdx::optional _partition_key; + uint64_t _next_data_offset_to_write_summary = 0; private: + void maybe_add_summary_entry(const dht::token& token, bytes_view key); uint64_t get_offset() const; file_writer index_file_writer(sstable& sst, const io_priority_class& pc); void ensure_tombstone_is_written() { @@ -810,7 +812,8 @@ public: ~components_writer(); components_writer(components_writer&& o) : _sst(o._sst), _schema(o._schema), _out(o._out), _index(std::move(o._index)), _index_needs_close(o._index_needs_close), _max_sstable_size(o._max_sstable_size), _tombstone_written(o._tombstone_written), - _first_key(std::move(o._first_key)), _last_key(std::move(o._last_key)), _partition_key(std::move(o._partition_key)) { + _first_key(std::move(o._first_key)), _last_key(std::move(o._last_key)), _partition_key(std::move(o._partition_key)), + _next_data_offset_to_write_summary(o._next_data_offset_to_write_summary) { o._index_needs_close = false; } @@ -821,6 +824,9 @@ public: stop_iteration consume(range_tombstone&& rt); stop_iteration consume_end_of_partition(); void consume_end_of_stream(); + + static void maybe_add_summary_entry(summary& s, const dht::token& token, bytes_view key, uint64_t data_offset, + uint64_t index_offset, uint64_t& next_data_offset_to_write_summary); }; class sstable_writer { diff --git a/sstables/types.hh b/sstables/types.hh index 7785f200ce..5f6b274696 100644 --- a/sstables/types.hh +++ b/sstables/types.hh @@ -219,10 +219,6 @@ struct summary_ka { disk_string first_key; disk_string last_key; - // Used to determine when a summary entry should be added based on min_index_interval. - // NOTE: keys_written isn't part of on-disk format of summary. - size_t keys_written; - // NOTE4: There is a structure written by Cassandra into the end of the Summary // file, after the field last_key, that we haven't understand yet, but we know // that its content isn't related to the summary itself. diff --git a/tests/sstable_datafile_test.cc b/tests/sstable_datafile_test.cc index d58de58c07..bf3b42d9a3 100644 --- a/tests/sstable_datafile_test.cc +++ b/tests/sstable_datafile_test.cc @@ -582,8 +582,7 @@ SEASTAR_TEST_CASE(datafile_generation_08) { const column_definition& r1_col = *s->get_column_definition("r1"); - // Create 150 partitions so that summary file store 2 entries, assuming min index - // interval is 128. + // TODO: generate sstable which will have 2 samples with size-based sampling. for (int32_t i = 0; i < 150; i++) { auto key = partition_key::from_exploded(*s, {int32_type->decompose(i)}); auto c_key = clustering_key::from_exploded(*s, {to_bytes("abc")}); @@ -605,13 +604,13 @@ SEASTAR_TEST_CASE(datafile_generation_08) { auto buf = bufptr.get(); size_t offset = 0; - std::vector header = { /* min_index_interval */ 0, 0, 0, 0x80, /* size */ 0, 0, 0, 2, - /* memory_size */ 0, 0, 0, 0, 0, 0, 0, 0x20, /* sampling_level */ 0, 0, 0, 0x80, - /* size_at_full_sampling */ 0, 0, 0, 2 }; + std::vector header = { /* min_index_interval */ 0, 0, 0, 0x80, /* size */ 0, 0, 0, 1, + /* memory_size */ 0, 0, 0, 0, 0, 0, 0, 0x10, /* sampling_level */ 0, 0, 0, 0x80, + /* size_at_full_sampling */ 0, 0, 0, 1 }; BOOST_REQUIRE(::memcmp(header.data(), &buf[offset], header.size()) == 0); offset += header.size(); - std::vector positions = { 0x8, 0, 0, 0, 0x14, 0, 0, 0 }; + std::vector positions = { 0x4, 0, 0, 0 }; BOOST_REQUIRE(::memcmp(positions.data(), &buf[offset], positions.size()) == 0); offset += positions.size(); @@ -619,10 +618,6 @@ SEASTAR_TEST_CASE(datafile_generation_08) { BOOST_REQUIRE(::memcmp(first_entry.data(), &buf[offset], first_entry.size()) == 0); offset += first_entry.size(); - std::vector second_entry = { /* key */ 0, 0, 0, 0x65, /* position */ 0, 0x9, 0, 0, 0, 0, 0, 0 }; - BOOST_REQUIRE(::memcmp(second_entry.data(), &buf[offset], second_entry.size()) == 0); - offset += second_entry.size(); - std::vector first_key = { 0, 0, 0, 0x4, 0, 0, 0, 0x17 }; BOOST_REQUIRE(::memcmp(first_key.data(), &buf[offset], first_key.size()) == 0); offset += first_key.size(); diff --git a/tests/sstable_test.cc b/tests/sstable_test.cc index 28e2acafea..fed7d49f54 100644 --- a/tests/sstable_test.cc +++ b/tests/sstable_test.cc @@ -196,9 +196,13 @@ SEASTAR_TEST_CASE(missing_summary_query_negative_fail) { return summary_query_fail<-uint64_t(2), 0, 5>(uncompressed_schema(), "tests/sstables/uncompressed", 2); } +// TODO: only one interval is generated with size-based sampling. Test it with a sstable that will actually result +// in two intervals. +#if 0 SEASTAR_TEST_CASE(missing_summary_interval_1_query_ok) { return summary_query<1, 19, 6>(uncompressed_schema(1), "tests/sstables/uncompressed", 2); } +#endif SEASTAR_TEST_CASE(missing_summary_first_last_sane) { return reusable_sst(uncompressed_schema(), "tests/sstables/uncompressed", 2).then([] (sstable_ptr ptr) { From 872412d31a2748f8c687860ca41148f276b1aaad Mon Sep 17 00:00:00 2001 From: "Raphael S. Carvalho" Date: Fri, 11 Aug 2017 01:22:58 -0300 Subject: [PATCH 4/5] db/config: introduce sstable_summary_ratio option Signed-off-by: Raphael S. Carvalho --- db/config.hh | 2 ++ sstables/sstables.cc | 20 +++++++++++++++----- sstables/sstables.hh | 7 +++++-- 3 files changed, 22 insertions(+), 7 deletions(-) diff --git a/db/config.hh b/db/config.hh index dca6168da2..a320392d26 100644 --- a/db/config.hh +++ b/db/config.hh @@ -775,6 +775,8 @@ public: val(abort_on_lsa_bad_alloc, bool, false, Used, "Abort when allocation in LSA region fails") \ val(murmur3_partitioner_ignore_msb_bits, unsigned, 0, Used, "Number of most siginificant token bits to ignore in murmur3 partitioner; increase for very large clusters") \ val(virtual_dirty_soft_limit, double, 0.6, Used, "Soft limit of virtual dirty memory expressed as a portion of the hard limit") \ + val(sstable_summary_ratio, double, 0.0005, Used, "Enforces that 1 byte of summary is written for every N (2000 by default) " \ + "bytes written to data file. Value must be between 0 and 1.") \ /* done! */ #define _make_value_member(name, type, deflt, status, desc, ...) \ diff --git a/sstables/sstables.cc b/sstables/sstables.cc index 8db799db2c..347de18c1e 100644 --- a/sstables/sstables.cc +++ b/sstables/sstables.cc @@ -68,6 +68,8 @@ namespace sstables { logging::logger sstlog("sstable"); +static const db::config& get_config(); + seastar::shared_ptr default_write_monitor() { static thread_local seastar::shared_ptr monitor = seastar::make_shared(); return monitor; @@ -1866,9 +1868,8 @@ static void seal_statistics(statistics& s, metadata_collector& collector, } void components_writer::maybe_add_summary_entry(summary& s, const dht::token& token, bytes_view key, uint64_t data_offset, - uint64_t index_offset, uint64_t& next_data_offset_to_write_summary) { + uint64_t index_offset, uint64_t& next_data_offset_to_write_summary, size_t summary_byte_cost) { static constexpr size_t target_index_interval_size = 65536; - static constexpr size_t summary_byte_cost = 2000; // TODO: use configuration file for it. auto index_size_for_current_entry = index_offset - (s.entries.size() ? s.entries.back().position : 0); @@ -1885,7 +1886,7 @@ void components_writer::maybe_add_summary_entry(summary& s, const dht::token& to void components_writer::maybe_add_summary_entry(const dht::token& token, bytes_view key) { return maybe_add_summary_entry(_sst._components->summary, token, key, get_offset(), - _index.offset(), _next_data_offset_to_write_summary); + _index.offset(), _next_data_offset_to_write_summary, _summary_byte_cost); } // Returns offset into data component. @@ -1918,6 +1919,13 @@ static const db::config& get_config() { } } +// Returns the cost for writing a byte to summary such that the ratio of summary +// to data will be 1 to cost by the time sstable is sealed. +static size_t summary_byte_cost() { + auto summary_ratio = get_config().sstable_summary_ratio(); + return summary_ratio ? (1 / summary_ratio) : components_writer::default_summary_byte_cost; +} + components_writer::components_writer(sstable& sst, const schema& s, file_writer& out, uint64_t estimated_partitions, const sstable_writer_config& cfg, @@ -1929,6 +1937,7 @@ components_writer::components_writer(sstable& sst, const schema& s, file_writer& , _index_needs_close(true) , _max_sstable_size(cfg.max_sstable_size) , _tombstone_written(false) + , _summary_byte_cost(summary_byte_cost()) { _sst._components->filter = utils::i_filter::get_filter(estimated_partitions, _schema.bloom_filter_fp_chance()); _sst._pi_write.desired_block_size = cfg.promoted_index_block_size.value_or(get_config().column_index_size_in_kb() * 1024); @@ -2226,18 +2235,19 @@ future<> sstable::generate_summary(const io_priority_class& pc) { class summary_generator { summary& _summary; uint64_t _data_size; + size_t _summary_byte_cost; uint64_t _next_data_offset_to_write_summary = 0; public: std::experimental::optional first_key, last_key; - summary_generator(summary& s, uint64_t data_size) : _summary(s), _data_size(data_size) {} + summary_generator(summary& s, uint64_t data_size) : _summary(s), _data_size(data_size), _summary_byte_cost(summary_byte_cost()) {} bool should_continue() { return true; } void consume_entry(index_entry&& ie, uint64_t index_offset) { auto token = dht::global_partitioner().get_token(ie.get_key()); components_writer::maybe_add_summary_entry(_summary, token, ie.get_key_bytes(), _data_size, index_offset, - _next_data_offset_to_write_summary); + _next_data_offset_to_write_summary, _summary_byte_cost); if (!first_key) { first_key = key(to_bytes(ie.get_key_bytes())); } else { diff --git a/sstables/sstables.hh b/sstables/sstables.hh index 7ebf543163..ba48273007 100644 --- a/sstables/sstables.hh +++ b/sstables/sstables.hh @@ -798,6 +798,8 @@ class components_writer { stdx::optional _first_key, _last_key; stdx::optional _partition_key; uint64_t _next_data_offset_to_write_summary = 0; + // Enforces ratio of summary to data of 1 to N. + size_t _summary_byte_cost = default_summary_byte_cost; private: void maybe_add_summary_entry(const dht::token& token, bytes_view key); uint64_t get_offset() const; @@ -813,7 +815,7 @@ public: components_writer(components_writer&& o) : _sst(o._sst), _schema(o._schema), _out(o._out), _index(std::move(o._index)), _index_needs_close(o._index_needs_close), _max_sstable_size(o._max_sstable_size), _tombstone_written(o._tombstone_written), _first_key(std::move(o._first_key)), _last_key(std::move(o._last_key)), _partition_key(std::move(o._partition_key)), - _next_data_offset_to_write_summary(o._next_data_offset_to_write_summary) { + _next_data_offset_to_write_summary(o._next_data_offset_to_write_summary), _summary_byte_cost(o._summary_byte_cost) { o._index_needs_close = false; } @@ -825,8 +827,9 @@ public: stop_iteration consume_end_of_partition(); void consume_end_of_stream(); + static constexpr size_t default_summary_byte_cost = 2000; static void maybe_add_summary_entry(summary& s, const dht::token& token, bytes_view key, uint64_t data_offset, - uint64_t index_offset, uint64_t& next_data_offset_to_write_summary); + uint64_t index_offset, uint64_t& next_data_offset_to_write_summary, size_t summary_byte_cost); }; class sstable_writer { From 5124f94358b51f0ddc5e996f638e640180776528 Mon Sep 17 00:00:00 2001 From: "Raphael S. Carvalho" Date: Fri, 11 Aug 2017 01:37:06 -0300 Subject: [PATCH 5/5] tests: test summary entry spanning more keys than min interval Signed-off-by: Raphael S. Carvalho --- tests/sstable_datafile_test.cc | 40 ++++++++++++++++++++++++++++++++++ 1 file changed, 40 insertions(+) diff --git a/tests/sstable_datafile_test.cc b/tests/sstable_datafile_test.cc index bf3b42d9a3..a7c865031c 100644 --- a/tests/sstable_datafile_test.cc +++ b/tests/sstable_datafile_test.cc @@ -4052,3 +4052,43 @@ SEASTAR_TEST_CASE(sstable_expired_data_ratio) { } }); } + +SEASTAR_TEST_CASE(test_summary_entry_spanning_more_keys_than_min_interval) { + return seastar::async([] { + auto s = make_lw_shared(schema({}, some_keyspace, some_column_family, + {{"p1", int32_type}}, {{"c1", utf8_type}}, {{"r1", int32_type}}, {}, utf8_type)); + + const column_definition& r1_col = *s->get_column_definition("r1"); + std::vector mutations; + auto keys_written = 0; + for (auto i = 0; i < s->min_index_interval()*1.5; i++) { + auto key = partition_key::from_exploded(*s, {int32_type->decompose(i)}); + auto c_key = clustering_key::from_exploded(*s, {to_bytes("abc")}); + mutation m(key, s); + m.set_clustered_cell(c_key, r1_col, make_atomic_cell(int32_type->decompose(1))); + mutations.push_back(std::move(m)); + keys_written++; + } + + auto tmp = make_lw_shared(); + auto sst_gen = [s, tmp, gen = make_lw_shared(1)] () mutable { + return make_lw_shared(s, tmp->path, (*gen)++, la, big); + }; + auto sst = make_sstable_containing(sst_gen, mutations); + sst = reusable_sst(s, tmp->path, sst->generation()).get0(); + + summary& sum = sstables::test(sst).get_summary(); + BOOST_REQUIRE(sum.entries.size() == 1); + + std::set merged; + merged.insert(mutations.begin(), mutations.end()); + auto rd = assert_that(sst->as_mutation_source()(s)); + auto keys_read = 0; + for (auto&& m : merged) { + keys_read++; + rd.produces(m); + } + rd.produces_end_of_stream(); + BOOST_REQUIRE(keys_read == keys_written); + }); +}