sstables: mc: writer: Avoid double-serialization of the promoted index

This commit is contained in:
Tomasz Grabiec
2019-02-13 19:57:15 +01:00
parent a3de5581ce
commit 4e093bc3a4

View File

@@ -700,15 +700,19 @@ private:
_prev_row_start = pos;
maybe_add_pi_block();
}
void write_promoted_index(file_writer& writer);
void write_promoted_index(bytes_ostream& writer);
void consume(rt_marker&& marker);
void flush_tmp_bufs() {
void flush_tmp_bufs(file_writer& writer) {
for (auto&& buf : _tmp_bufs) {
_data_writer->write(buf);
writer.write(buf);
}
_tmp_bufs.clear();
}
void flush_tmp_bufs() {
flush_tmp_bufs(*_data_writer);
}
public:
writer(sstable& sst, const schema& s, uint64_t estimated_partitions,
@@ -1111,18 +1115,6 @@ void writer::write_row_body(bytes_ostream& writer, const clustering_row& row, bo
return write_cells(writer, column_kind::regular_column, row.cells(), properties, has_complex_deletion);
}
template<typename Func>
uint64_t calculate_write_size(Func&& func) {
uint64_t written_size = 0;
{
auto counting_writer = file_writer(make_sizing_output_stream(written_size));
func(counting_writer);
counting_writer.flush();
counting_writer.close();
}
return written_size;
}
// Find if any collection in the row contains a collection-wide tombstone
static bool row_has_complex_deletion(const schema& s, const row& r, column_kind kind) {
bool result = false;
@@ -1248,16 +1240,16 @@ static void write_clustering_prefix(W& writer, bound_kind_m kind,
write_clustering_prefix(writer, s, clustering, is_ephemerally_full);
}
void writer::write_promoted_index(file_writer& writer) {
void writer::write_promoted_index(bytes_ostream& writer) {
static constexpr size_t width_base = 65536;
write_vint(writer, _partition_header_length);
write(_sst.get_version(), writer, to_deletion_time(_pi_write_m.tomb));
write_vint(writer, _pi_write_m.promoted_index.size());
std::vector<uint32_t> offsets;
offsets.reserve(_pi_write_m.promoted_index.size());
uint64_t start = writer.offset();
uint64_t start = writer.size();
for (const pi_block& block: _pi_write_m.promoted_index) {
offsets.push_back(writer.offset() - start);
offsets.push_back(writer.size() - start);
write_clustering_prefix(writer, block.first.kind, _schema, block.first.clustering);
write_clustering_prefix(writer, block.last.kind, _schema, block.last.clustering);
write_vint(writer, block.offset);
@@ -1315,16 +1307,13 @@ stop_iteration writer::consume_end_of_partition() {
add_pi_block();
}
auto write_pi = [this] (file_writer& writer) {
return write_promoted_index(writer);
};
if (_pi_write_m.promoted_index.size() < 2) {
write_vint(*_index_writer, uint64_t(0));
} else {
uint64_t pi_size = calculate_write_size(write_pi);
write_promoted_index(_tmp_bufs);
uint64_t pi_size = _tmp_bufs.size();
write_vint(*_index_writer, pi_size);
write_pi(*_index_writer);
flush_tmp_bufs(*_index_writer);
}
// compute size of the current row.