diff --git a/db/config.hh b/db/config.hh index dca6168da2..a320392d26 100644 --- a/db/config.hh +++ b/db/config.hh @@ -775,6 +775,8 @@ public: val(abort_on_lsa_bad_alloc, bool, false, Used, "Abort when allocation in LSA region fails") \ val(murmur3_partitioner_ignore_msb_bits, unsigned, 0, Used, "Number of most siginificant token bits to ignore in murmur3 partitioner; increase for very large clusters") \ val(virtual_dirty_soft_limit, double, 0.6, Used, "Soft limit of virtual dirty memory expressed as a portion of the hard limit") \ + val(sstable_summary_ratio, double, 0.0005, Used, "Enforces that 1 byte of summary is written for every N (2000 by default) " \ + "bytes written to data file. Value must be between 0 and 1.") \ /* done! */ #define _make_value_member(name, type, deflt, status, desc, ...) \ diff --git a/sstables/sstables.cc b/sstables/sstables.cc index 66ce6a8376..347de18c1e 100644 --- a/sstables/sstables.cc +++ b/sstables/sstables.cc @@ -68,6 +68,8 @@ namespace sstables { logging::logger sstlog("sstable"); +static const db::config& get_config(); + seastar::shared_ptr default_write_monitor() { static thread_local seastar::shared_ptr monitor = seastar::make_shared(); return monitor; @@ -1764,7 +1766,6 @@ static void prepare_summary(summary& s, uint64_t expected_partition_count, uint3 throw malformed_sstable_exception("Current sampling level (" + to_sstring(downsampling::BASE_SAMPLING_LEVEL) + ") not enough to generate summary."); } - s.keys_written = 0; s.header.memory_size = 0; } @@ -1802,13 +1803,6 @@ static void prepare_compression(compression& c, const schema& schema) { c.init_full_checksum(); } -static void maybe_add_summary_entry(summary& s, const dht::token& token, bytes_view key, uint64_t offset) { - // Maybe add summary entry into in-memory representation of summary file. - if ((s.keys_written++ % s.header.min_index_interval) == 0) { - s.entries.push_back({ token, bytes(key.data(), key.size()), offset }); - } -} - static void populate_statistics_offsets(statistics& s) { @@ -1873,8 +1867,30 @@ static void seal_statistics(statistics& s, metadata_collector& collector, populate_statistics_offsets(s); } +void components_writer::maybe_add_summary_entry(summary& s, const dht::token& token, bytes_view key, uint64_t data_offset, + uint64_t index_offset, uint64_t& next_data_offset_to_write_summary, size_t summary_byte_cost) { + static constexpr size_t target_index_interval_size = 65536; + + auto index_size_for_current_entry = index_offset - (s.entries.size() ? s.entries.back().position : 0); + + // generates a summary entry after 64 KB of index data *iff* we're writing 2000 (default value) to data + // for every 1 byte written to summary. 64 KB condition will prevent useless generation of summary entry + // for small key with lots of data. Both conditions will prevent summary from growing large for large + // keys with little data. + if (!next_data_offset_to_write_summary || (data_offset >= next_data_offset_to_write_summary && + index_size_for_current_entry >= target_index_interval_size)) { + next_data_offset_to_write_summary += summary_byte_cost * key.size(); + s.entries.push_back({ token, bytes(key.data(), key.size()), index_offset }); + } +} + +void components_writer::maybe_add_summary_entry(const dht::token& token, bytes_view key) { + return maybe_add_summary_entry(_sst._components->summary, token, key, get_offset(), + _index.offset(), _next_data_offset_to_write_summary, _summary_byte_cost); +} + // Returns offset into data component. -size_t components_writer::get_offset() { +uint64_t components_writer::get_offset() const { if (_sst.has_component(sstable::component_type::CompressionInfo)) { // Variable returned by compressed_file_length() is constantly updated by compressed output stream. return _sst._components->compression.compressed_file_length(); @@ -1903,6 +1919,13 @@ static const db::config& get_config() { } } +// Returns the cost for writing a byte to summary such that the ratio of summary +// to data will be 1 to cost by the time sstable is sealed. +static size_t summary_byte_cost() { + auto summary_ratio = get_config().sstable_summary_ratio(); + return summary_ratio ? (1 / summary_ratio) : components_writer::default_summary_byte_cost; +} + components_writer::components_writer(sstable& sst, const schema& s, file_writer& out, uint64_t estimated_partitions, const sstable_writer_config& cfg, @@ -1914,6 +1937,7 @@ components_writer::components_writer(sstable& sst, const schema& s, file_writer& , _index_needs_close(true) , _max_sstable_size(cfg.max_sstable_size) , _tombstone_written(false) + , _summary_byte_cost(summary_byte_cost()) { _sst._components->filter = utils::i_filter::get_filter(estimated_partitions, _schema.bloom_filter_fp_chance()); _sst._pi_write.desired_block_size = cfg.promoted_index_block_size.value_or(get_config().column_index_size_in_kb() * 1024); @@ -1929,7 +1953,7 @@ void components_writer::consume_new_partition(const dht::decorated_key& dk) { _partition_key = key::from_partition_key(_schema, dk.key()); - maybe_add_summary_entry(_sst._components->summary, dk.token(), bytes_view(*_partition_key), _index.offset()); + maybe_add_summary_entry(dk.token(), bytes_view(*_partition_key)); _sst._components->filter->add(bytes_view(*_partition_key)); _sst._collector.add_key(bytes_view(*_partition_key)); @@ -2210,16 +2234,20 @@ future<> sstable::generate_summary(const io_priority_class& pc) { sstlog.info("Summary file {} not found. Generating Summary...", filename(sstable::component_type::Summary)); class summary_generator { summary& _summary; + uint64_t _data_size; + size_t _summary_byte_cost; + uint64_t _next_data_offset_to_write_summary = 0; public: std::experimental::optional first_key, last_key; - summary_generator(summary& s) : _summary(s) {} + summary_generator(summary& s, uint64_t data_size) : _summary(s), _data_size(data_size), _summary_byte_cost(summary_byte_cost()) {} bool should_continue() { return true; } - void consume_entry(index_entry&& ie, uint64_t offset) { + void consume_entry(index_entry&& ie, uint64_t index_offset) { auto token = dht::global_partitioner().get_token(ie.get_key()); - maybe_add_summary_entry(_summary, token, ie.get_key_bytes(), offset); + components_writer::maybe_add_summary_entry(_summary, token, ie.get_key_bytes(), _data_size, index_offset, + _next_data_offset_to_write_summary, _summary_byte_cost); if (!first_key) { first_key = key(to_bytes(ie.get_key_bytes())); } else { @@ -2230,17 +2258,20 @@ future<> sstable::generate_summary(const io_priority_class& pc) { return open_checked_file_dma(_read_error_handler, filename(component_type::Index), open_flags::ro).then([this, &pc] (file index_file) { return do_with(std::move(index_file), [this, &pc] (file index_file) { - return index_file.size().then([this, &pc, index_file] (auto size) { + return seastar::when_all_succeed( + io_check([&] { return engine().file_size(this->filename(sstable::component_type::Data)); }), + index_file.size()).then([this, &pc, index_file] (auto data_size, auto index_size) { // an upper bound. Surely to be less than this. - auto estimated_partitions = size / sizeof(uint64_t); + auto estimated_partitions = index_size / sizeof(uint64_t); prepare_summary(_components->summary, estimated_partitions, _schema->min_index_interval()); file_input_stream_options options; options.buffer_size = sstable_buffer_size; options.io_priority_class = pc; - auto stream = make_file_input_stream(index_file, 0, size, std::move(options)); - return do_with(summary_generator(_components->summary), [this, &pc, stream = std::move(stream), size] (summary_generator& s) mutable { - auto ctx = make_lw_shared>(s, std::move(stream), 0, size); + auto stream = make_file_input_stream(index_file, 0, index_size, std::move(options)); + return do_with(summary_generator(_components->summary, data_size), + [this, &pc, stream = std::move(stream), index_size] (summary_generator& s) mutable { + auto ctx = make_lw_shared>(s, std::move(stream), 0, index_size); return ctx->consume_input(*ctx).finally([ctx] { return ctx->close(); }).then([this, ctx, &s] { diff --git a/sstables/sstables.hh b/sstables/sstables.hh index 993cd5d92e..ba48273007 100644 --- a/sstables/sstables.hh +++ b/sstables/sstables.hh @@ -797,8 +797,12 @@ class components_writer { // Remember first and last keys, which we need for the summary file. stdx::optional _first_key, _last_key; stdx::optional _partition_key; + uint64_t _next_data_offset_to_write_summary = 0; + // Enforces ratio of summary to data of 1 to N. + size_t _summary_byte_cost = default_summary_byte_cost; private: - size_t get_offset(); + void maybe_add_summary_entry(const dht::token& token, bytes_view key); + uint64_t get_offset() const; file_writer index_file_writer(sstable& sst, const io_priority_class& pc); void ensure_tombstone_is_written() { if (!_tombstone_written) { @@ -810,7 +814,8 @@ public: ~components_writer(); components_writer(components_writer&& o) : _sst(o._sst), _schema(o._schema), _out(o._out), _index(std::move(o._index)), _index_needs_close(o._index_needs_close), _max_sstable_size(o._max_sstable_size), _tombstone_written(o._tombstone_written), - _first_key(std::move(o._first_key)), _last_key(std::move(o._last_key)), _partition_key(std::move(o._partition_key)) { + _first_key(std::move(o._first_key)), _last_key(std::move(o._last_key)), _partition_key(std::move(o._partition_key)), + _next_data_offset_to_write_summary(o._next_data_offset_to_write_summary), _summary_byte_cost(o._summary_byte_cost) { o._index_needs_close = false; } @@ -821,6 +826,10 @@ public: stop_iteration consume(range_tombstone&& rt); stop_iteration consume_end_of_partition(); void consume_end_of_stream(); + + static constexpr size_t default_summary_byte_cost = 2000; + static void maybe_add_summary_entry(summary& s, const dht::token& token, bytes_view key, uint64_t data_offset, + uint64_t index_offset, uint64_t& next_data_offset_to_write_summary, size_t summary_byte_cost); }; class sstable_writer { diff --git a/sstables/types.hh b/sstables/types.hh index 7785f200ce..5f6b274696 100644 --- a/sstables/types.hh +++ b/sstables/types.hh @@ -219,10 +219,6 @@ struct summary_ka { disk_string first_key; disk_string last_key; - // Used to determine when a summary entry should be added based on min_index_interval. - // NOTE: keys_written isn't part of on-disk format of summary. - size_t keys_written; - // NOTE4: There is a structure written by Cassandra into the end of the Summary // file, after the field last_key, that we haven't understand yet, but we know // that its content isn't related to the summary itself. diff --git a/sstables/writer.hh b/sstables/writer.hh index fe89f4a30f..8165f0710c 100644 --- a/sstables/writer.hh +++ b/sstables/writer.hh @@ -31,7 +31,7 @@ namespace sstables { class file_writer { output_stream _out; - size_t _offset = 0; + uint64_t _offset = 0; public: file_writer(file f, file_output_stream_options options) : _out(make_file_output_stream(std::move(f), std::move(options))) {} @@ -56,7 +56,7 @@ public: future<> close() { return _out.close(); } - size_t offset() { + uint64_t offset() const { return _offset; } }; diff --git a/tests/sstable_datafile_test.cc b/tests/sstable_datafile_test.cc index d58de58c07..a7c865031c 100644 --- a/tests/sstable_datafile_test.cc +++ b/tests/sstable_datafile_test.cc @@ -582,8 +582,7 @@ SEASTAR_TEST_CASE(datafile_generation_08) { const column_definition& r1_col = *s->get_column_definition("r1"); - // Create 150 partitions so that summary file store 2 entries, assuming min index - // interval is 128. + // TODO: generate sstable which will have 2 samples with size-based sampling. for (int32_t i = 0; i < 150; i++) { auto key = partition_key::from_exploded(*s, {int32_type->decompose(i)}); auto c_key = clustering_key::from_exploded(*s, {to_bytes("abc")}); @@ -605,13 +604,13 @@ SEASTAR_TEST_CASE(datafile_generation_08) { auto buf = bufptr.get(); size_t offset = 0; - std::vector header = { /* min_index_interval */ 0, 0, 0, 0x80, /* size */ 0, 0, 0, 2, - /* memory_size */ 0, 0, 0, 0, 0, 0, 0, 0x20, /* sampling_level */ 0, 0, 0, 0x80, - /* size_at_full_sampling */ 0, 0, 0, 2 }; + std::vector header = { /* min_index_interval */ 0, 0, 0, 0x80, /* size */ 0, 0, 0, 1, + /* memory_size */ 0, 0, 0, 0, 0, 0, 0, 0x10, /* sampling_level */ 0, 0, 0, 0x80, + /* size_at_full_sampling */ 0, 0, 0, 1 }; BOOST_REQUIRE(::memcmp(header.data(), &buf[offset], header.size()) == 0); offset += header.size(); - std::vector positions = { 0x8, 0, 0, 0, 0x14, 0, 0, 0 }; + std::vector positions = { 0x4, 0, 0, 0 }; BOOST_REQUIRE(::memcmp(positions.data(), &buf[offset], positions.size()) == 0); offset += positions.size(); @@ -619,10 +618,6 @@ SEASTAR_TEST_CASE(datafile_generation_08) { BOOST_REQUIRE(::memcmp(first_entry.data(), &buf[offset], first_entry.size()) == 0); offset += first_entry.size(); - std::vector second_entry = { /* key */ 0, 0, 0, 0x65, /* position */ 0, 0x9, 0, 0, 0, 0, 0, 0 }; - BOOST_REQUIRE(::memcmp(second_entry.data(), &buf[offset], second_entry.size()) == 0); - offset += second_entry.size(); - std::vector first_key = { 0, 0, 0, 0x4, 0, 0, 0, 0x17 }; BOOST_REQUIRE(::memcmp(first_key.data(), &buf[offset], first_key.size()) == 0); offset += first_key.size(); @@ -4057,3 +4052,43 @@ SEASTAR_TEST_CASE(sstable_expired_data_ratio) { } }); } + +SEASTAR_TEST_CASE(test_summary_entry_spanning_more_keys_than_min_interval) { + return seastar::async([] { + auto s = make_lw_shared(schema({}, some_keyspace, some_column_family, + {{"p1", int32_type}}, {{"c1", utf8_type}}, {{"r1", int32_type}}, {}, utf8_type)); + + const column_definition& r1_col = *s->get_column_definition("r1"); + std::vector mutations; + auto keys_written = 0; + for (auto i = 0; i < s->min_index_interval()*1.5; i++) { + auto key = partition_key::from_exploded(*s, {int32_type->decompose(i)}); + auto c_key = clustering_key::from_exploded(*s, {to_bytes("abc")}); + mutation m(key, s); + m.set_clustered_cell(c_key, r1_col, make_atomic_cell(int32_type->decompose(1))); + mutations.push_back(std::move(m)); + keys_written++; + } + + auto tmp = make_lw_shared(); + auto sst_gen = [s, tmp, gen = make_lw_shared(1)] () mutable { + return make_lw_shared(s, tmp->path, (*gen)++, la, big); + }; + auto sst = make_sstable_containing(sst_gen, mutations); + sst = reusable_sst(s, tmp->path, sst->generation()).get0(); + + summary& sum = sstables::test(sst).get_summary(); + BOOST_REQUIRE(sum.entries.size() == 1); + + std::set merged; + merged.insert(mutations.begin(), mutations.end()); + auto rd = assert_that(sst->as_mutation_source()(s)); + auto keys_read = 0; + for (auto&& m : merged) { + keys_read++; + rd.produces(m); + } + rd.produces_end_of_stream(); + BOOST_REQUIRE(keys_read == keys_written); + }); +} diff --git a/tests/sstable_test.cc b/tests/sstable_test.cc index 28e2acafea..fed7d49f54 100644 --- a/tests/sstable_test.cc +++ b/tests/sstable_test.cc @@ -196,9 +196,13 @@ SEASTAR_TEST_CASE(missing_summary_query_negative_fail) { return summary_query_fail<-uint64_t(2), 0, 5>(uncompressed_schema(), "tests/sstables/uncompressed", 2); } +// TODO: only one interval is generated with size-based sampling. Test it with a sstable that will actually result +// in two intervals. +#if 0 SEASTAR_TEST_CASE(missing_summary_interval_1_query_ok) { return summary_query<1, 19, 6>(uncompressed_schema(1), "tests/sstables/uncompressed", 2); } +#endif SEASTAR_TEST_CASE(missing_summary_first_last_sane) { return reusable_sst(uncompressed_schema(), "tests/sstables/uncompressed", 2).then([] (sstable_ptr ptr) {