From f9038d5d780cf64213ac5ba122fcd1530fa89bc1 Mon Sep 17 00:00:00 2001 From: Tomasz Grabiec Date: Thu, 21 Dec 2017 22:34:29 +0100 Subject: [PATCH] sstables: Handle consecutive range_tombstone fragments with same position In preparation for allowing fragment streams to produce range_tombstones with the same position. --- sstables/sstables.cc | 28 ++++++++++++++++++++++++---- sstables/sstables.hh | 8 +++++++- 2 files changed, 31 insertions(+), 5 deletions(-) diff --git a/sstables/sstables.cc b/sstables/sstables.cc index ea5f671b61..cc92405f71 100644 --- a/sstables/sstables.cc +++ b/sstables/sstables.cc @@ -2083,6 +2083,7 @@ components_writer::components_writer(sstable& sst, const schema& s, file_writer& , _max_sstable_size(cfg.max_sstable_size) , _tombstone_written(false) , _summary_byte_cost(summary_byte_cost()) + , _range_tombstones(s) { _sst._components->filter = utils::i_filter::get_filter(estimated_partitions, _schema.bloom_filter_fp_chance()); _sst._pi_write.desired_block_size = cfg.promoted_index_block_size.value_or(get_config().column_index_size_in_kb() * 1024); @@ -2154,13 +2155,32 @@ stop_iteration components_writer::consume(static_row&& sr) { } stop_iteration components_writer::consume(clustering_row&& cr) { - ensure_tombstone_is_written(); + drain_tombstones(cr.position()); _sst.write_clustered_row(_out, _schema, cr); return stop_iteration::no; } -stop_iteration components_writer::consume(range_tombstone&& rt) { +void components_writer::drain_tombstones(position_in_partition_view pos) { ensure_tombstone_is_written(); + while (auto mfo = _range_tombstones.get_next(pos)) { + write_tombstone(std::move(mfo->as_mutable_range_tombstone())); + } +} + +void components_writer::drain_tombstones() { + ensure_tombstone_is_written(); + while (auto mfo = _range_tombstones.get_next()) { + write_tombstone(std::move(mfo->as_mutable_range_tombstone())); + } +} + +stop_iteration components_writer::consume(range_tombstone&& rt) { + drain_tombstones(rt.position()); + _range_tombstones.apply(std::move(rt)); + return stop_iteration::no; +} + +void components_writer::write_tombstone(range_tombstone&& rt) { auto start = composite::from_clustering_element(_schema, rt.start); auto start_marker = bound_kind_to_start_marker(rt.start_kind); auto end = composite::from_clustering_element(_schema, rt.end); @@ -2168,10 +2188,11 @@ stop_iteration components_writer::consume(range_tombstone&& rt) { auto tomb = rt.tomb; _sst.index_tombstone(_out, start, std::move(rt), start_marker); _sst.write_range_tombstone(_out, std::move(start), start_marker, std::move(end), end_marker, {}, tomb); - return stop_iteration::no; } stop_iteration components_writer::consume_end_of_partition() { + drain_tombstones(); + // If there is an incomplete block in the promoted index, write it too. // However, if the _promoted_index is still empty, don't add a single // chunk - better not output a promoted index at all in this case. @@ -2188,7 +2209,6 @@ stop_iteration components_writer::consume_end_of_partition() { _sst._pi_write.data = {}; _sst._pi_write.block_first_colname = {}; - ensure_tombstone_is_written(); int16_t end_of_row = 0; write(_out, end_of_row); diff --git a/sstables/sstables.hh b/sstables/sstables.hh index 9b1aec8032..c1b1b2892b 100644 --- a/sstables/sstables.hh +++ b/sstables/sstables.hh @@ -828,10 +828,15 @@ class components_writer { uint64_t _next_data_offset_to_write_summary = 0; // Enforces ratio of summary to data of 1 to N. size_t _summary_byte_cost = default_summary_byte_cost; + range_tombstone_stream _range_tombstones; private: void maybe_add_summary_entry(const dht::token& token, bytes_view key); uint64_t get_offset() const; file_writer index_file_writer(sstable& sst, const io_priority_class& pc); + // Emits all tombstones which start before pos. + void drain_tombstones(position_in_partition_view pos); + void drain_tombstones(); + void write_tombstone(range_tombstone&&); void ensure_tombstone_is_written() { if (!_tombstone_written) { consume(tombstone()); @@ -843,7 +848,8 @@ public: components_writer(components_writer&& o) : _sst(o._sst), _schema(o._schema), _out(o._out), _index(std::move(o._index)), _index_needs_close(o._index_needs_close), _max_sstable_size(o._max_sstable_size), _tombstone_written(o._tombstone_written), _first_key(std::move(o._first_key)), _last_key(std::move(o._last_key)), _partition_key(std::move(o._partition_key)), - _next_data_offset_to_write_summary(o._next_data_offset_to_write_summary), _summary_byte_cost(o._summary_byte_cost) { + _next_data_offset_to_write_summary(o._next_data_offset_to_write_summary), _summary_byte_cost(o._summary_byte_cost), + _range_tombstones(std::move(o._range_tombstones)) { o._index_needs_close = false; }