sstables: Handle consecutive range_tombstone fragments with same position

In preparation for allowing fragment streams to produce range_tombstones
with the same position.
This commit is contained in:
Tomasz Grabiec
2017-12-21 22:34:29 +01:00
parent 92b89d576d
commit f9038d5d78
2 changed files with 31 additions and 5 deletions

View File

@@ -2083,6 +2083,7 @@ components_writer::components_writer(sstable& sst, const schema& s, file_writer&
, _max_sstable_size(cfg.max_sstable_size)
, _tombstone_written(false)
, _summary_byte_cost(summary_byte_cost())
, _range_tombstones(s)
{
_sst._components->filter = utils::i_filter::get_filter(estimated_partitions, _schema.bloom_filter_fp_chance());
_sst._pi_write.desired_block_size = cfg.promoted_index_block_size.value_or(get_config().column_index_size_in_kb() * 1024);
@@ -2154,13 +2155,32 @@ stop_iteration components_writer::consume(static_row&& sr) {
}
stop_iteration components_writer::consume(clustering_row&& cr) {
ensure_tombstone_is_written();
drain_tombstones(cr.position());
_sst.write_clustered_row(_out, _schema, cr);
return stop_iteration::no;
}
stop_iteration components_writer::consume(range_tombstone&& rt) {
void components_writer::drain_tombstones(position_in_partition_view pos) {
ensure_tombstone_is_written();
while (auto mfo = _range_tombstones.get_next(pos)) {
write_tombstone(std::move(mfo->as_mutable_range_tombstone()));
}
}
void components_writer::drain_tombstones() {
ensure_tombstone_is_written();
while (auto mfo = _range_tombstones.get_next()) {
write_tombstone(std::move(mfo->as_mutable_range_tombstone()));
}
}
stop_iteration components_writer::consume(range_tombstone&& rt) {
drain_tombstones(rt.position());
_range_tombstones.apply(std::move(rt));
return stop_iteration::no;
}
void components_writer::write_tombstone(range_tombstone&& rt) {
auto start = composite::from_clustering_element(_schema, rt.start);
auto start_marker = bound_kind_to_start_marker(rt.start_kind);
auto end = composite::from_clustering_element(_schema, rt.end);
@@ -2168,10 +2188,11 @@ stop_iteration components_writer::consume(range_tombstone&& rt) {
auto tomb = rt.tomb;
_sst.index_tombstone(_out, start, std::move(rt), start_marker);
_sst.write_range_tombstone(_out, std::move(start), start_marker, std::move(end), end_marker, {}, tomb);
return stop_iteration::no;
}
stop_iteration components_writer::consume_end_of_partition() {
drain_tombstones();
// If there is an incomplete block in the promoted index, write it too.
// However, if the _promoted_index is still empty, don't add a single
// chunk - better not output a promoted index at all in this case.
@@ -2188,7 +2209,6 @@ stop_iteration components_writer::consume_end_of_partition() {
_sst._pi_write.data = {};
_sst._pi_write.block_first_colname = {};
ensure_tombstone_is_written();
int16_t end_of_row = 0;
write(_out, end_of_row);

View File

@@ -828,10 +828,15 @@ class components_writer {
uint64_t _next_data_offset_to_write_summary = 0;
// Enforces ratio of summary to data of 1 to N.
size_t _summary_byte_cost = default_summary_byte_cost;
range_tombstone_stream _range_tombstones;
private:
void maybe_add_summary_entry(const dht::token& token, bytes_view key);
uint64_t get_offset() const;
file_writer index_file_writer(sstable& sst, const io_priority_class& pc);
// Emits all tombstones which start before pos.
void drain_tombstones(position_in_partition_view pos);
void drain_tombstones();
void write_tombstone(range_tombstone&&);
void ensure_tombstone_is_written() {
if (!_tombstone_written) {
consume(tombstone());
@@ -843,7 +848,8 @@ public:
components_writer(components_writer&& o) : _sst(o._sst), _schema(o._schema), _out(o._out), _index(std::move(o._index)),
_index_needs_close(o._index_needs_close), _max_sstable_size(o._max_sstable_size), _tombstone_written(o._tombstone_written),
_first_key(std::move(o._first_key)), _last_key(std::move(o._last_key)), _partition_key(std::move(o._partition_key)),
_next_data_offset_to_write_summary(o._next_data_offset_to_write_summary), _summary_byte_cost(o._summary_byte_cost) {
_next_data_offset_to_write_summary(o._next_data_offset_to_write_summary), _summary_byte_cost(o._summary_byte_cost),
_range_tombstones(std::move(o._range_tombstones)) {
o._index_needs_close = false;
}