diff --git a/sstables/sstables.cc b/sstables/sstables.cc index 263d34e73f..a24dbc3a55 100644 --- a/sstables/sstables.cc +++ b/sstables/sstables.cc @@ -31,6 +31,7 @@ #include "core/do_with.hh" #include "core/thread.hh" #include +#include #include #include "types.hh" @@ -56,6 +57,7 @@ #include "checked-file-impl.hh" #include "disk-error-handler.hh" +#include "service/storage_service.hh" thread_local disk_error_signal_type sstable_read_error; thread_local disk_error_signal_type sstable_write_error; @@ -296,6 +298,12 @@ inline void write(file_writer& out, bytes_view s) { out.write(reinterpret_cast(s.data()), s.size()).get(); } +inline void write(file_writer& out, bytes_ostream s) { + for (bytes_view fragment : s) { + write(out, fragment); + } +} + // All composite parsers must come after this template future<> parse(random_access_reader& in, First& first, Rest&&... rest) { @@ -1066,6 +1074,108 @@ future<> sstable::load() { }); } +static void output_promoted_index_entry(bytes_ostream& promoted_index, + const bytes& first_col, + const bytes& last_col, + uint64_t offset, uint64_t width) { + char s[2]; + write_be(s, uint16_t(first_col.size())); + promoted_index.write(s, 2); + promoted_index.write(first_col); + write_be(s, uint16_t(last_col.size())); + promoted_index.write(s, 2); + promoted_index.write(last_col); + char q[8]; + write_be(q, uint64_t(offset)); + promoted_index.write(q, 8); + write_be(q, uint64_t(width)); + promoted_index.write(q, 8); +} + +// FIXME: use this in write_column_name() instead of repeating the code +static bytes serialize_colname(const composite& clustering_key, + const std::vector& column_names, composite::eoc marker) { + auto c = composite::from_exploded(column_names, marker); + auto ck_bview = bytes_view(clustering_key); + // The marker is not a component, so if the last component is empty (IOW, + // only serializes to the marker), then we just replace the key's last byte + // with the marker. If the component however it is not empty, then the + // marker should be in the end of it, and we just join them together as we + // do for any normal component + if (c.size() == 1) { + ck_bview.remove_suffix(1); + } + size_t sz = ck_bview.size() + c.size(); + if (sz > std::numeric_limits::max()) { + throw std::runtime_error(sprint("Column name too large (%d > %d)", sz, std::numeric_limits::max())); + } + bytes colname(bytes::initialized_later(), sz); + std::copy(ck_bview.begin(), ck_bview.end(), colname.begin()); + std::copy(c.get_bytes().begin(), c.get_bytes().end(), colname.begin() + ck_bview.size()); + return colname; +} + +// Call maybe_flush_pi_block() before writing the given sstable atom to the +// output. This may start a new promoted-index block depending on how much +// data we've already written since the start of the current block. Starting +// a new block involves both outputting the range of the old block to the +// index file, and outputting again the currently-open range tombstones to +// the data file. +// TODO: currently, maybe_flush_pi_block serializes the column name on every +// call, saving it in _pi_write.block_last_colname which we need for closing +// each block, as well as for closing the last block. We could instead save +// just the unprocessed arguments, and serialize them only when needed at the +// end of the block. For this we would need this function to take rvalue +// references (so data is moved in), and need not to use vector of byte_view +// (which might be gone later). +void sstable::maybe_flush_pi_block(file_writer& out, + const composite& clustering_key, + const std::vector& column_names) { + bytes colname = serialize_colname(clustering_key, column_names, composite::eoc::none); + if (_pi_write.block_first_colname.empty()) { + // This is the first column in the partition, or first column since we + // closed a promoted-index block. Remember its name and position - + // we'll need to write it to the promoted index. + _pi_write.block_start_offset = out.offset(); + _pi_write.block_next_start_offset = out.offset() + _pi_write.desired_block_size; + _pi_write.block_first_colname = colname; + _pi_write.block_last_colname = std::move(colname); + } else if (out.offset() >= _pi_write.block_next_start_offset) { + // If we wrote enough bytes to the partition since we output a sample + // to the promoted index, output one now and start a new one. + output_promoted_index_entry(_pi_write.data, + _pi_write.block_first_colname, + _pi_write.block_last_colname, + _pi_write.block_start_offset - _c_stats.start_offset, + out.offset() - _pi_write.block_start_offset); + _pi_write.numblocks++; + _pi_write.block_start_offset = out.offset(); + // Because the new block can be read without the previous blocks, we + // need to repeat the range tombstones which are still open. + // Note that block_start_offset is before outputting those (so the new + // block includes them), but we set block_next_start_offset after - so + // even if we wrote a lot of open tombstones, we still get a full + // block size of new data. + if (!clustering_key.empty()) { + auto& rts = _pi_write.tombstone_accumulator->range_tombstones_for_row( + clustering_key_prefix(clustering_key.values())); + for (const auto& rt : rts) { + auto start = composite::from_clustering_element(*_pi_write.schemap, rt.start); + auto end = composite::from_clustering_element(*_pi_write.schemap, rt.end); + write_range_tombstone(out, + start, rt.start_kind, end, rt.end_kind, {}, rt.tomb); + } + } + _pi_write.block_next_start_offset = out.offset() + _pi_write.desired_block_size; + _pi_write.block_first_colname = colname; + _pi_write.block_last_colname = std::move(colname); + } else { + // Keep track of the last column in the partition - we'll need it to close + // the last block in the promoted index, unfortunately. + _pi_write.block_last_colname = std::move(colname); + } +} + // @clustering_key: it's expected that clustering key is already in its composite form. // NOTE: empty clustering key means that there is no clustering key. void sstable::write_column_name(file_writer& out, const composite& clustering_key, const std::vector& column_names, composite::eoc marker) { @@ -1223,6 +1333,7 @@ void sstable::write_collection(file_writer& out, const composite& clustering_key const bytes& column_name = cdef.name(); write_range_tombstone(out, clustering_key, clustering_key, { bytes_view(column_name) }, mview.tomb); for (auto& cp: mview.cells) { + maybe_flush_pi_block(out, clustering_key, { column_name, cp.first }); write_column_name(out, clustering_key, { column_name, cp.first }); write_cell(out, cp.second); } @@ -1234,11 +1345,22 @@ void sstable::write_clustered_row(file_writer& out, const schema& schema, const auto clustering_key = composite::from_clustering_element(schema, clustered_row.key()); if (schema.is_compound() && !schema.is_dense()) { + maybe_flush_pi_block(out, clustering_key, { bytes_view() }); write_row_marker(out, clustered_row.marker(), clustering_key); } // Before writing cells, range tombstone must be written if the row has any (deletable_row::t). if (clustered_row.tomb()) { + maybe_flush_pi_block(out, clustering_key, {}); write_range_tombstone(out, clustering_key, clustering_key, {}, clustered_row.tomb()); + // Because we currently may break a partition to promoted-index blocks + // in the middle of a clustered row, we also need to track the current + // row's tombstone - not just range tombstones - which may effect the + // beginning of a new block. + // TODO: consider starting a new block only between rows, so the + // following code can be dropped: + _pi_write.tombstone_accumulator->apply(range_tombstone( + clustered_row.key(), bound_kind::incl_start, + clustered_row.key(), bound_kind::incl_end, clustered_row.tomb())); } // Write all cells of a partition's row. @@ -1256,14 +1378,18 @@ void sstable::write_clustered_row(file_writer& out, const schema& schema, const if (schema.is_compound()) { if (schema.is_dense()) { + maybe_flush_pi_block(out, composite(), { bytes_view(clustering_key) }); write_column_name(out, bytes_view(clustering_key)); } else { + maybe_flush_pi_block(out, clustering_key, { bytes_view(column_name) }); write_column_name(out, clustering_key, { bytes_view(column_name) }); } } else { if (schema.is_dense()) { + maybe_flush_pi_block(out, composite(), { bytes_view(clustered_row.key().get_component(schema, 0)) }); write_column_name(out, bytes_view(clustered_row.key().get_component(schema, 0))); } else { + maybe_flush_pi_block(out, composite(), { bytes_view(column_name) }); write_column_name(out, bytes_view(column_name)); } } @@ -1282,16 +1408,25 @@ void sstable::write_static_row(file_writer& out, const schema& schema, const row assert(column_definition.is_static()); atomic_cell_view cell = c.as_atomic_cell(); auto sp = composite::static_prefix(schema); + maybe_flush_pi_block(out, sp, { bytes_view(column_definition.name()) }); write_column_name(out, sp, { bytes_view(column_definition.name()) }); write_cell(out, cell); }); } -static void write_index_entry(file_writer& out, disk_string_view& key, uint64_t pos) { - // FIXME: support promoted indexes. - uint32_t promoted_index_size = 0; +static void write_index_header(file_writer& out, disk_string_view& key, uint64_t pos) { + write(out, key, pos); +} - write(out, key, pos, promoted_index_size); +static void write_index_promoted(file_writer& out, bytes_ostream& promoted_index, + deletion_time deltime, uint32_t numblocks) { + uint32_t promoted_index_size = promoted_index.size(); + if (promoted_index_size) { + promoted_index_size += 16 /* deltime + numblocks */; + write(out, promoted_index_size, deltime, numblocks, promoted_index); + } else { + write(out, promoted_index_size); + } } static void prepare_summary(summary& s, uint64_t expected_partition_count, uint32_t min_index_interval) { @@ -1405,6 +1540,17 @@ file_writer components_writer::index_file_writer(sstable& sst, const io_priority return file_writer(sst._index_file, std::move(options)); } +// Get the currently loaded configuration, or the default configuration in +// case none has been loaded (this happens, for example, in unit tests). +static const db::config& get_config() { + if (service::get_storage_service().local_is_initialized()) { + return service::get_local_storage_service().db().local().get_config(); + } else { + static db::config default_config; + return default_config; + } +} + components_writer::components_writer(sstable& sst, const schema& s, file_writer& out, uint64_t estimated_partitions, uint64_t max_sstable_size, const io_priority_class& pc) @@ -1415,6 +1561,7 @@ components_writer::components_writer(sstable& sst, const schema& s, file_writer& , _max_sstable_size(max_sstable_size) { _sst._filter = utils::i_filter::get_filter(estimated_partitions, _schema.bloom_filter_fp_chance()); + _sst._pi_write.desired_block_size = get_config().column_index_size_in_kb() * 1024; prepare_summary(_sst._summary, estimated_partitions, _schema.min_index_interval()); @@ -1435,7 +1582,17 @@ void components_writer::consume_new_partition(const dht::decorated_key& dk) { p_key.value = bytes_view(*_partition_key); // Write index file entry from partition key into index file. - write_index_entry(_index, p_key, _out.offset()); + // Write an index entry minus the "promoted index" (sample of columns) + // part. We can only write that after processing the entire partition + // and collecting the sample of columns. + write_index_header(_index, p_key, _out.offset()); + _sst._pi_write.data = {}; + _sst._pi_write.numblocks = 0; + _sst._pi_write.deltime.local_deletion_time = std::numeric_limits::max(); + _sst._pi_write.deltime.marked_for_delete_at = std::numeric_limits::min(); + _sst._pi_write.block_start_offset = _out.offset(); + _sst._pi_write.tombstone_accumulator = range_tombstone_accumulator(_schema, false); + _sst._pi_write.schemap = &_schema; // sadly we need this // Write partition key into data file. write(_out, p_key); @@ -1461,6 +1618,8 @@ void components_writer::consume(tombstone t) { } write(_out, d); _tombstone_written = true; + // TODO: need to verify we don't do this twice? + _sst._pi_write.deltime = d; } stop_iteration components_writer::consume(static_row&& sr) { @@ -1477,13 +1636,35 @@ stop_iteration components_writer::consume(clustering_row&& cr) { stop_iteration components_writer::consume(range_tombstone&& rt) { ensure_tombstone_is_written(); + // Remember the range tombstone so when we need to open a new promoted + // index block, we can figure out which ranges are still open and need + // to be repeated in the data file. Note that apply() also drops ranges + // already closed by rt.start, so the accumulator doesn't grow boundless. + _sst._pi_write.tombstone_accumulator->apply(rt); auto start = composite::from_clustering_element(_schema, std::move(rt.start)); auto end = composite::from_clustering_element(_schema, std::move(rt.end)); + _sst.maybe_flush_pi_block(_out, start, {}); _sst.write_range_tombstone(_out, std::move(start), rt.start_kind, std::move(end), rt.end_kind, {}, rt.tomb); return stop_iteration::no; } stop_iteration components_writer::consume_end_of_partition() { + // If there is an incomplete block in the promoted index, write it too. + // However, if the _promoted_index is still empty, don't add a single + // chunk - better not output a promoted index at all in this case. + if (!_sst._pi_write.data.empty() && !_sst._pi_write.block_first_colname.empty()) { + output_promoted_index_entry(_sst._pi_write.data, + _sst._pi_write.block_first_colname, + _sst._pi_write.block_last_colname, + _sst._pi_write.block_start_offset - _sst._c_stats.start_offset, + _out.offset() - _sst._pi_write.block_start_offset); + _sst._pi_write.numblocks++; + _sst._pi_write.block_first_colname = {}; + } + write_index_promoted(_index, _sst._pi_write.data, _sst._pi_write.deltime, + _sst._pi_write.numblocks); + _sst._pi_write.data = {}; + ensure_tombstone_is_written(); int16_t end_of_row = 0; write(_out, end_of_row); diff --git a/sstables/sstables.hh b/sstables/sstables.hh index 4c7fb53f2b..9be43c6989 100644 --- a/sstables/sstables.hh +++ b/sstables/sstables.hh @@ -407,6 +407,27 @@ private: uint64_t _filter_file_size = 0; uint64_t _bytes_on_disk = 0; + // _pi_write is used temporarily for building the promoted + // index (column sample) of one partition when writing a new sstable. + struct { + // Unfortunately we cannot output the promoted index directly to the + // index file because it needs to be prepended by its size. + bytes_ostream data; + uint32_t numblocks; + deletion_time deltime; + uint64_t block_start_offset; + uint64_t block_next_start_offset; + bytes block_first_colname; + bytes block_last_colname; + std::experimental::optional tombstone_accumulator; + const schema* schemap; + size_t desired_block_size; + } _pi_write; + + void maybe_flush_pi_block(file_writer& out, + const composite& clustering_key, + const std::vector& column_names); + sstring _ks; sstring _cf; sstring _dir;