diff --git a/sstables/sstables.cc b/sstables/sstables.cc index 145bb8facd..3bd8418b6e 100644 --- a/sstables/sstables.cc +++ b/sstables/sstables.cc @@ -1078,14 +1078,14 @@ static void write_index_entry(file_writer& out, disk_string_view& key, static constexpr int BASE_SAMPLING_LEVEL = 128; -static void prepare_summary(summary& s, const memtable& mt) { - auto& all_partitions = mt.all_partitions(); - assert(all_partitions.size() >= 1); +static void prepare_summary(summary& s, size_t expected_partition_count) { + assert(expected_partition_count >= 1); s.header.min_index_interval = BASE_SAMPLING_LEVEL; s.header.sampling_level = BASE_SAMPLING_LEVEL; - - uint64_t max_expected_entries = (all_partitions.size() / BASE_SAMPLING_LEVEL) + !!(all_partitions.size() % BASE_SAMPLING_LEVEL); + uint64_t max_expected_entries = + (expected_partition_count / BASE_SAMPLING_LEVEL) + + !!(expected_partition_count % BASE_SAMPLING_LEVEL); // FIXME: handle case where max_expected_entries is greater than max value stored by uint32_t. if (max_expected_entries > std::numeric_limits::max()) { throw malformed_sstable_exception("Current sampling level (" + to_sstring(BASE_SAMPLING_LEVEL) + ") not enough to generate summary."); @@ -1097,9 +1097,10 @@ static void prepare_summary(summary& s, const memtable& mt) { s.header.memory_size = 0; } -static void seal_summary(summary& s, const memtable& mt) { - auto& all_partitions = mt.all_partitions(); - +static void seal_summary(summary& s, + std::experimental::optional&& first_key, + std::experimental::optional&& last_key, + const schema& schema) { s.header.size = s.entries.size(); s.header.size_at_full_sampling = s.header.size; @@ -1108,15 +1109,15 @@ static void seal_summary(summary& s, const memtable& mt) { s.positions.push_back(s.header.memory_size); s.header.memory_size += e.key.size() + sizeof(e.position); } + assert(first_key); // assume non-empty sstable + s.first_key.value = first_key->get_bytes(); - auto begin = all_partitions.begin(); - auto last = --all_partitions.end(); - - auto first_key = key::from_partition_key(*mt.schema(), begin->first._key); - s.first_key.value = std::move(first_key.get_bytes()); - - auto last_key = key::from_partition_key(*mt.schema(), last->first._key); - s.last_key.value = std::move(last_key.get_bytes()); + if (last_key) { + s.last_key.value = last_key->get_bytes(); + } else { + // An empty last_mutation indicates we had just one partition + s.last_key.value = s.first_key.value; + } } // In the beginning of the statistics file, there is a disk_hash used to @@ -1178,15 +1179,16 @@ static constexpr size_t sstable_buffer_size = 64*1024; /// /// @param out holds an output stream to data file. /// -void sstable::do_write_components(const memtable& mt, file_writer& out) { +void sstable::do_write_components(::mutation_reader mr, + size_t estimated_partitions, schema_ptr schema, file_writer& out) { auto index = make_shared(_index_file, sstable_buffer_size); - prepare_summary(_summary, mt); - auto filter_fp_chance = mt.schema()->bloom_filter_fp_chance(); + prepare_summary(_summary, estimated_partitions); + auto filter_fp_chance = schema->bloom_filter_fp_chance(); if (filter_fp_chance != 1.0) { _components.insert(component_type::Filter); } - _filter = utils::i_filter::get_filter(mt.all_partitions().size(), filter_fp_chance); + _filter = utils::i_filter::get_filter(estimated_partitions, filter_fp_chance); prepare_statistics(_statistics); @@ -1195,13 +1197,16 @@ void sstable::do_write_components(const memtable& mt, file_writer& out) { add_validation_metadata(_statistics, dht::global_partitioner().name(), filter_fp_chance); auto collector = make_lw_shared(); + // Remember first and last keys, which we need for the summary file. + std::experimental::optional first_key, last_key; + // Iterate through CQL partitions, then CQL rows, then CQL columns. // Each mt.all_partitions() entry is a set of clustered rows sharing the same partition key. - for (auto& partition_entry: mt.all_partitions()) { + while (mutation_opt mut = mr().get0()) { // FIXME: it's likely that we need to set both sstable_level and repaired_at at this point. // Set current index of data to later compute row size. _c_stats.start_offset = out.offset(); - auto partition_key = key::from_partition_key(*mt.schema(), partition_entry.first._key); + auto partition_key = key::from_partition_key(*schema, mut->key()); // Maybe add summary entry into in-memory representation of summary file. maybe_add_summary_entry(_summary, bytes_view(partition_key), index->offset()); _filter->add(bytes_view(partition_key)); @@ -1215,7 +1220,7 @@ void sstable::do_write_components(const memtable& mt, file_writer& out) { // Write partition key into data file. write(out, p_key); - auto tombstone = partition_entry.second.partition_tombstone(); + auto tombstone = mut->partition().partition_tombstone(); deletion_time d; if (tombstone) { @@ -1233,18 +1238,18 @@ void sstable::do_write_components(const memtable& mt, file_writer& out) { } write(out, d); - auto& partition = partition_entry.second; + auto& partition = mut->partition(); auto& static_row = partition.static_row(); - write_static_row(out, *mt.schema(), static_row); + write_static_row(out, *schema, static_row); for (const auto& rt: partition.row_tombstones()) { - auto prefix = composite::from_clustering_element(*mt.schema(), rt.prefix()); + auto prefix = composite::from_clustering_element(*schema, rt.prefix()); write_range_tombstone(out, prefix, {}, rt.t()); } // Write all CQL rows from a given mutation partition. for (auto& clustered_row: partition.clustered_rows()) { - write_clustered_row(out, *mt.schema(), clustered_row); + write_clustered_row(out, *schema, clustered_row); } int16_t end_of_row = 0; write(out, end_of_row); @@ -1254,8 +1259,14 @@ void sstable::do_write_components(const memtable& mt, file_writer& out) { // update is about merging column_stats with the data being stored by collector. collector->update(_c_stats); _c_stats.reset(); + if (!first_key) { + first_key = std::move(partition_key); + } else { + last_key = std::move(partition_key); + } + } - seal_summary(_summary, mt); + seal_summary(_summary, std::move(first_key), std::move(last_key), *schema); index->close().get(); @@ -1273,23 +1284,23 @@ void sstable::do_write_components(const memtable& mt, file_writer& out) { add_stats_metadata(_statistics, *collector); } -void sstable::prepare_write_components(const memtable& mt) { +void sstable::prepare_write_components(::mutation_reader mr, size_t estimated_partitions, schema_ptr schema) { // CRC component must only be present when compression isn't enabled. - bool checksum_file = mt.schema()->get_compressor() == compressor::none; + bool checksum_file = schema->get_compressor() == compressor::none; if (checksum_file) { auto w = make_shared(_data_file, sstable_buffer_size, checksum_file); _components.insert(component_type::CRC); - this->do_write_components(mt, *w); + this->do_write_components(std::move(mr), estimated_partitions, std::move(schema), *w); w->close().get(); write_digest(filename(sstable::component_type::Digest), w->full_checksum()).get(); write_crc(filename(sstable::component_type::CRC), w->finalize_checksum()).get(); } else { - prepare_compression(_compression, *mt.schema()); + prepare_compression(_compression, *schema); auto w = make_shared(make_compressed_file_output_stream(_data_file, &_compression)); _components.insert(component_type::CompressionInfo); - this->do_write_components(mt, *w); + this->do_write_components(std::move(mr), estimated_partitions, std::move(schema), *w); w->close().get(); write_digest(filename(sstable::component_type::Digest), _compression.full_checksum()).get(); @@ -1297,12 +1308,18 @@ void sstable::prepare_write_components(const memtable& mt) { } future<> sstable::write_components(const memtable& mt) { - return touch_directory(_dir).then([this, &mt] { - return create_data().then([this, &mt] { - auto w = [this] (const memtable& mt) { - this->prepare_write_components(mt); + return write_components(mt.make_reader(), + mt.all_partitions().size(), mt.schema()); +} + +future<> sstable::write_components(::mutation_reader mr, + size_t estimated_partitions, schema_ptr schema) { + return touch_directory(_dir).then([this, mr = std::move(mr), estimated_partitions, schema = std::move(schema)] { + return create_data().then([this, mr = std::move(mr), estimated_partitions, schema = std::move(schema)] { + auto w = [this] (::mutation_reader mr, size_t estimated_partitions, schema_ptr schema) { + this->prepare_write_components(std::move(mr), estimated_partitions, std::move(schema)); }; - return seastar::async(w, mt).then([this] { + return seastar::async(std::move(w), std::move(mr), estimated_partitions, std::move(schema)).then([this] { return write_summary(); }).then([this] { return write_filter(); diff --git a/sstables/sstables.hh b/sstables/sstables.hh index cdf1248224..4f9f377f1e 100644 --- a/sstables/sstables.hh +++ b/sstables/sstables.hh @@ -198,10 +198,14 @@ public: mutation_reader read_rows(schema_ptr schema); // Write sstable components from a memtable. - void do_write_components(const memtable& mt, file_writer& out); - void prepare_write_components(const memtable& mt); future<> write_components(const memtable& mt); + future<> write_components(::mutation_reader mr, + size_t estimated_partitions, schema_ptr schema); private: + void do_write_components(::mutation_reader mr, + size_t estimated_partitions, schema_ptr schema, file_writer& out); + void prepare_write_components(::mutation_reader mr, + size_t estimated_partitions, schema_ptr schema); static std::unordered_map> _version_string; static std::unordered_map> _format_string; static std::unordered_map> _component_map;