sstables: take generic mutation_reader instead of memtable
The current sstable write interface only knows how to write a memtable. For compaction, we also want it to be able to write the compaction's output, which we can represent as a mutation_reader. So this patch changes the sstable::write_components() method to accept a mutation_reader, and whatever else is needed (a schema and the number of partitions in the reader - or an estimate thereof). Signed-off-by: Nadav Har'El <nyh@cloudius-systems.com>
This commit is contained in:
@@ -1078,14 +1078,14 @@ static void write_index_entry(file_writer& out, disk_string_view<uint16_t>& key,
|
||||
|
||||
static constexpr int BASE_SAMPLING_LEVEL = 128;
|
||||
|
||||
static void prepare_summary(summary& s, const memtable& mt) {
|
||||
auto& all_partitions = mt.all_partitions();
|
||||
assert(all_partitions.size() >= 1);
|
||||
static void prepare_summary(summary& s, size_t expected_partition_count) {
|
||||
assert(expected_partition_count >= 1);
|
||||
|
||||
s.header.min_index_interval = BASE_SAMPLING_LEVEL;
|
||||
s.header.sampling_level = BASE_SAMPLING_LEVEL;
|
||||
|
||||
uint64_t max_expected_entries = (all_partitions.size() / BASE_SAMPLING_LEVEL) + !!(all_partitions.size() % BASE_SAMPLING_LEVEL);
|
||||
uint64_t max_expected_entries =
|
||||
(expected_partition_count / BASE_SAMPLING_LEVEL) +
|
||||
!!(expected_partition_count % BASE_SAMPLING_LEVEL);
|
||||
// FIXME: handle case where max_expected_entries is greater than max value stored by uint32_t.
|
||||
if (max_expected_entries > std::numeric_limits<uint32_t>::max()) {
|
||||
throw malformed_sstable_exception("Current sampling level (" + to_sstring(BASE_SAMPLING_LEVEL) + ") not enough to generate summary.");
|
||||
@@ -1097,9 +1097,10 @@ static void prepare_summary(summary& s, const memtable& mt) {
|
||||
s.header.memory_size = 0;
|
||||
}
|
||||
|
||||
static void seal_summary(summary& s, const memtable& mt) {
|
||||
auto& all_partitions = mt.all_partitions();
|
||||
|
||||
static void seal_summary(summary& s,
|
||||
std::experimental::optional<key>&& first_key,
|
||||
std::experimental::optional<key>&& last_key,
|
||||
const schema& schema) {
|
||||
s.header.size = s.entries.size();
|
||||
s.header.size_at_full_sampling = s.header.size;
|
||||
|
||||
@@ -1108,15 +1109,15 @@ static void seal_summary(summary& s, const memtable& mt) {
|
||||
s.positions.push_back(s.header.memory_size);
|
||||
s.header.memory_size += e.key.size() + sizeof(e.position);
|
||||
}
|
||||
assert(first_key); // assume non-empty sstable
|
||||
s.first_key.value = first_key->get_bytes();
|
||||
|
||||
auto begin = all_partitions.begin();
|
||||
auto last = --all_partitions.end();
|
||||
|
||||
auto first_key = key::from_partition_key(*mt.schema(), begin->first._key);
|
||||
s.first_key.value = std::move(first_key.get_bytes());
|
||||
|
||||
auto last_key = key::from_partition_key(*mt.schema(), last->first._key);
|
||||
s.last_key.value = std::move(last_key.get_bytes());
|
||||
if (last_key) {
|
||||
s.last_key.value = last_key->get_bytes();
|
||||
} else {
|
||||
// An empty last_mutation indicates we had just one partition
|
||||
s.last_key.value = s.first_key.value;
|
||||
}
|
||||
}
|
||||
|
||||
// In the beginning of the statistics file, there is a disk_hash used to
|
||||
@@ -1178,15 +1179,16 @@ static constexpr size_t sstable_buffer_size = 64*1024;
|
||||
///
|
||||
/// @param out holds an output stream to data file.
|
||||
///
|
||||
void sstable::do_write_components(const memtable& mt, file_writer& out) {
|
||||
void sstable::do_write_components(::mutation_reader mr,
|
||||
size_t estimated_partitions, schema_ptr schema, file_writer& out) {
|
||||
auto index = make_shared<file_writer>(_index_file, sstable_buffer_size);
|
||||
|
||||
prepare_summary(_summary, mt);
|
||||
auto filter_fp_chance = mt.schema()->bloom_filter_fp_chance();
|
||||
prepare_summary(_summary, estimated_partitions);
|
||||
auto filter_fp_chance = schema->bloom_filter_fp_chance();
|
||||
if (filter_fp_chance != 1.0) {
|
||||
_components.insert(component_type::Filter);
|
||||
}
|
||||
_filter = utils::i_filter::get_filter(mt.all_partitions().size(), filter_fp_chance);
|
||||
_filter = utils::i_filter::get_filter(estimated_partitions, filter_fp_chance);
|
||||
|
||||
prepare_statistics(_statistics);
|
||||
|
||||
@@ -1195,13 +1197,16 @@ void sstable::do_write_components(const memtable& mt, file_writer& out) {
|
||||
add_validation_metadata(_statistics, dht::global_partitioner().name(), filter_fp_chance);
|
||||
auto collector = make_lw_shared<metadata_collector>();
|
||||
|
||||
// Remember first and last keys, which we need for the summary file.
|
||||
std::experimental::optional<key> first_key, last_key;
|
||||
|
||||
// Iterate through CQL partitions, then CQL rows, then CQL columns.
|
||||
// Each mt.all_partitions() entry is a set of clustered rows sharing the same partition key.
|
||||
for (auto& partition_entry: mt.all_partitions()) {
|
||||
while (mutation_opt mut = mr().get0()) {
|
||||
// FIXME: it's likely that we need to set both sstable_level and repaired_at at this point.
|
||||
// Set current index of data to later compute row size.
|
||||
_c_stats.start_offset = out.offset();
|
||||
auto partition_key = key::from_partition_key(*mt.schema(), partition_entry.first._key);
|
||||
auto partition_key = key::from_partition_key(*schema, mut->key());
|
||||
// Maybe add summary entry into in-memory representation of summary file.
|
||||
maybe_add_summary_entry(_summary, bytes_view(partition_key), index->offset());
|
||||
_filter->add(bytes_view(partition_key));
|
||||
@@ -1215,7 +1220,7 @@ void sstable::do_write_components(const memtable& mt, file_writer& out) {
|
||||
// Write partition key into data file.
|
||||
write(out, p_key);
|
||||
|
||||
auto tombstone = partition_entry.second.partition_tombstone();
|
||||
auto tombstone = mut->partition().partition_tombstone();
|
||||
deletion_time d;
|
||||
|
||||
if (tombstone) {
|
||||
@@ -1233,18 +1238,18 @@ void sstable::do_write_components(const memtable& mt, file_writer& out) {
|
||||
}
|
||||
write(out, d);
|
||||
|
||||
auto& partition = partition_entry.second;
|
||||
auto& partition = mut->partition();
|
||||
auto& static_row = partition.static_row();
|
||||
|
||||
write_static_row(out, *mt.schema(), static_row);
|
||||
write_static_row(out, *schema, static_row);
|
||||
for (const auto& rt: partition.row_tombstones()) {
|
||||
auto prefix = composite::from_clustering_element(*mt.schema(), rt.prefix());
|
||||
auto prefix = composite::from_clustering_element(*schema, rt.prefix());
|
||||
write_range_tombstone(out, prefix, {}, rt.t());
|
||||
}
|
||||
|
||||
// Write all CQL rows from a given mutation partition.
|
||||
for (auto& clustered_row: partition.clustered_rows()) {
|
||||
write_clustered_row(out, *mt.schema(), clustered_row);
|
||||
write_clustered_row(out, *schema, clustered_row);
|
||||
}
|
||||
int16_t end_of_row = 0;
|
||||
write(out, end_of_row);
|
||||
@@ -1254,8 +1259,14 @@ void sstable::do_write_components(const memtable& mt, file_writer& out) {
|
||||
// update is about merging column_stats with the data being stored by collector.
|
||||
collector->update(_c_stats);
|
||||
_c_stats.reset();
|
||||
if (!first_key) {
|
||||
first_key = std::move(partition_key);
|
||||
} else {
|
||||
last_key = std::move(partition_key);
|
||||
}
|
||||
|
||||
}
|
||||
seal_summary(_summary, mt);
|
||||
seal_summary(_summary, std::move(first_key), std::move(last_key), *schema);
|
||||
|
||||
index->close().get();
|
||||
|
||||
@@ -1273,23 +1284,23 @@ void sstable::do_write_components(const memtable& mt, file_writer& out) {
|
||||
add_stats_metadata(_statistics, *collector);
|
||||
}
|
||||
|
||||
void sstable::prepare_write_components(const memtable& mt) {
|
||||
void sstable::prepare_write_components(::mutation_reader mr, size_t estimated_partitions, schema_ptr schema) {
|
||||
// CRC component must only be present when compression isn't enabled.
|
||||
bool checksum_file = mt.schema()->get_compressor() == compressor::none;
|
||||
bool checksum_file = schema->get_compressor() == compressor::none;
|
||||
|
||||
if (checksum_file) {
|
||||
auto w = make_shared<checksummed_file_writer>(_data_file, sstable_buffer_size, checksum_file);
|
||||
_components.insert(component_type::CRC);
|
||||
this->do_write_components(mt, *w);
|
||||
this->do_write_components(std::move(mr), estimated_partitions, std::move(schema), *w);
|
||||
w->close().get();
|
||||
|
||||
write_digest(filename(sstable::component_type::Digest), w->full_checksum()).get();
|
||||
write_crc(filename(sstable::component_type::CRC), w->finalize_checksum()).get();
|
||||
} else {
|
||||
prepare_compression(_compression, *mt.schema());
|
||||
prepare_compression(_compression, *schema);
|
||||
auto w = make_shared<file_writer>(make_compressed_file_output_stream(_data_file, &_compression));
|
||||
_components.insert(component_type::CompressionInfo);
|
||||
this->do_write_components(mt, *w);
|
||||
this->do_write_components(std::move(mr), estimated_partitions, std::move(schema), *w);
|
||||
w->close().get();
|
||||
|
||||
write_digest(filename(sstable::component_type::Digest), _compression.full_checksum()).get();
|
||||
@@ -1297,12 +1308,18 @@ void sstable::prepare_write_components(const memtable& mt) {
|
||||
}
|
||||
|
||||
future<> sstable::write_components(const memtable& mt) {
|
||||
return touch_directory(_dir).then([this, &mt] {
|
||||
return create_data().then([this, &mt] {
|
||||
auto w = [this] (const memtable& mt) {
|
||||
this->prepare_write_components(mt);
|
||||
return write_components(mt.make_reader(),
|
||||
mt.all_partitions().size(), mt.schema());
|
||||
}
|
||||
|
||||
future<> sstable::write_components(::mutation_reader mr,
|
||||
size_t estimated_partitions, schema_ptr schema) {
|
||||
return touch_directory(_dir).then([this, mr = std::move(mr), estimated_partitions, schema = std::move(schema)] {
|
||||
return create_data().then([this, mr = std::move(mr), estimated_partitions, schema = std::move(schema)] {
|
||||
auto w = [this] (::mutation_reader mr, size_t estimated_partitions, schema_ptr schema) {
|
||||
this->prepare_write_components(std::move(mr), estimated_partitions, std::move(schema));
|
||||
};
|
||||
return seastar::async(w, mt).then([this] {
|
||||
return seastar::async(std::move(w), std::move(mr), estimated_partitions, std::move(schema)).then([this] {
|
||||
return write_summary();
|
||||
}).then([this] {
|
||||
return write_filter();
|
||||
|
||||
@@ -198,10 +198,14 @@ public:
|
||||
mutation_reader read_rows(schema_ptr schema);
|
||||
|
||||
// Write sstable components from a memtable.
|
||||
void do_write_components(const memtable& mt, file_writer& out);
|
||||
void prepare_write_components(const memtable& mt);
|
||||
future<> write_components(const memtable& mt);
|
||||
future<> write_components(::mutation_reader mr,
|
||||
size_t estimated_partitions, schema_ptr schema);
|
||||
private:
|
||||
void do_write_components(::mutation_reader mr,
|
||||
size_t estimated_partitions, schema_ptr schema, file_writer& out);
|
||||
void prepare_write_components(::mutation_reader mr,
|
||||
size_t estimated_partitions, schema_ptr schema);
|
||||
static std::unordered_map<version_types, sstring, enum_hash<version_types>> _version_string;
|
||||
static std::unordered_map<format_types, sstring, enum_hash<format_types>> _format_string;
|
||||
static std::unordered_map<component_type, sstring, enum_hash<component_type>> _component_map;
|
||||
|
||||
Reference in New Issue
Block a user