sstables: promoted index write support
This patch adds writing of promoted index to sstables. The promoted index is basically a sample of columns and their positions for large partitions: The promoted index appears in the sstable's index file for partitions which are larger than 64 KB, and divides the partition to 64 KB blocks (as in Cassandra, this interval is configurable through the column_index_size_in_kb config parameter). Beyond modifying the index file, having a promoted index may also modify the data file: Since each of blocks may be read independently, we need to add in the beginning of each block the list of range tombstones that are still open at that position. See also https://github.com/scylladb/scylla/wiki/SSTables-Index-File Fixes #959 Signed-off-by: Nadav Har'El <nyh@scylladb.com>
This commit is contained in:
Notes:
Avi Kivity
2016-08-09 16:57:14 +03:00
backport: 1.3
@@ -31,6 +31,7 @@
|
||||
#include "core/do_with.hh"
|
||||
#include "core/thread.hh"
|
||||
#include <seastar/core/shared_future.hh>
|
||||
#include <seastar/core/byteorder.hh>
|
||||
#include <iterator>
|
||||
|
||||
#include "types.hh"
|
||||
@@ -56,6 +57,7 @@
|
||||
|
||||
#include "checked-file-impl.hh"
|
||||
#include "disk-error-handler.hh"
|
||||
#include "service/storage_service.hh"
|
||||
|
||||
thread_local disk_error_signal_type sstable_read_error;
|
||||
thread_local disk_error_signal_type sstable_write_error;
|
||||
@@ -296,6 +298,12 @@ inline void write(file_writer& out, bytes_view s) {
|
||||
out.write(reinterpret_cast<const char*>(s.data()), s.size()).get();
|
||||
}
|
||||
|
||||
inline void write(file_writer& out, bytes_ostream s) {
|
||||
for (bytes_view fragment : s) {
|
||||
write(out, fragment);
|
||||
}
|
||||
}
|
||||
|
||||
// All composite parsers must come after this
|
||||
template<typename First, typename... Rest>
|
||||
future<> parse(random_access_reader& in, First& first, Rest&&... rest) {
|
||||
@@ -1066,6 +1074,108 @@ future<> sstable::load() {
|
||||
});
|
||||
}
|
||||
|
||||
static void output_promoted_index_entry(bytes_ostream& promoted_index,
|
||||
const bytes& first_col,
|
||||
const bytes& last_col,
|
||||
uint64_t offset, uint64_t width) {
|
||||
char s[2];
|
||||
write_be(s, uint16_t(first_col.size()));
|
||||
promoted_index.write(s, 2);
|
||||
promoted_index.write(first_col);
|
||||
write_be(s, uint16_t(last_col.size()));
|
||||
promoted_index.write(s, 2);
|
||||
promoted_index.write(last_col);
|
||||
char q[8];
|
||||
write_be(q, uint64_t(offset));
|
||||
promoted_index.write(q, 8);
|
||||
write_be(q, uint64_t(width));
|
||||
promoted_index.write(q, 8);
|
||||
}
|
||||
|
||||
// FIXME: use this in write_column_name() instead of repeating the code
|
||||
static bytes serialize_colname(const composite& clustering_key,
|
||||
const std::vector<bytes_view>& column_names, composite::eoc marker) {
|
||||
auto c = composite::from_exploded(column_names, marker);
|
||||
auto ck_bview = bytes_view(clustering_key);
|
||||
// The marker is not a component, so if the last component is empty (IOW,
|
||||
// only serializes to the marker), then we just replace the key's last byte
|
||||
// with the marker. If the component however it is not empty, then the
|
||||
// marker should be in the end of it, and we just join them together as we
|
||||
// do for any normal component
|
||||
if (c.size() == 1) {
|
||||
ck_bview.remove_suffix(1);
|
||||
}
|
||||
size_t sz = ck_bview.size() + c.size();
|
||||
if (sz > std::numeric_limits<uint16_t>::max()) {
|
||||
throw std::runtime_error(sprint("Column name too large (%d > %d)", sz, std::numeric_limits<uint16_t>::max()));
|
||||
}
|
||||
bytes colname(bytes::initialized_later(), sz);
|
||||
std::copy(ck_bview.begin(), ck_bview.end(), colname.begin());
|
||||
std::copy(c.get_bytes().begin(), c.get_bytes().end(), colname.begin() + ck_bview.size());
|
||||
return colname;
|
||||
}
|
||||
|
||||
// Call maybe_flush_pi_block() before writing the given sstable atom to the
|
||||
// output. This may start a new promoted-index block depending on how much
|
||||
// data we've already written since the start of the current block. Starting
|
||||
// a new block involves both outputting the range of the old block to the
|
||||
// index file, and outputting again the currently-open range tombstones to
|
||||
// the data file.
|
||||
// TODO: currently, maybe_flush_pi_block serializes the column name on every
|
||||
// call, saving it in _pi_write.block_last_colname which we need for closing
|
||||
// each block, as well as for closing the last block. We could instead save
|
||||
// just the unprocessed arguments, and serialize them only when needed at the
|
||||
// end of the block. For this we would need this function to take rvalue
|
||||
// references (so data is moved in), and need not to use vector of byte_view
|
||||
// (which might be gone later).
|
||||
void sstable::maybe_flush_pi_block(file_writer& out,
|
||||
const composite& clustering_key,
|
||||
const std::vector<bytes_view>& column_names) {
|
||||
bytes colname = serialize_colname(clustering_key, column_names, composite::eoc::none);
|
||||
if (_pi_write.block_first_colname.empty()) {
|
||||
// This is the first column in the partition, or first column since we
|
||||
// closed a promoted-index block. Remember its name and position -
|
||||
// we'll need to write it to the promoted index.
|
||||
_pi_write.block_start_offset = out.offset();
|
||||
_pi_write.block_next_start_offset = out.offset() + _pi_write.desired_block_size;
|
||||
_pi_write.block_first_colname = colname;
|
||||
_pi_write.block_last_colname = std::move(colname);
|
||||
} else if (out.offset() >= _pi_write.block_next_start_offset) {
|
||||
// If we wrote enough bytes to the partition since we output a sample
|
||||
// to the promoted index, output one now and start a new one.
|
||||
output_promoted_index_entry(_pi_write.data,
|
||||
_pi_write.block_first_colname,
|
||||
_pi_write.block_last_colname,
|
||||
_pi_write.block_start_offset - _c_stats.start_offset,
|
||||
out.offset() - _pi_write.block_start_offset);
|
||||
_pi_write.numblocks++;
|
||||
_pi_write.block_start_offset = out.offset();
|
||||
// Because the new block can be read without the previous blocks, we
|
||||
// need to repeat the range tombstones which are still open.
|
||||
// Note that block_start_offset is before outputting those (so the new
|
||||
// block includes them), but we set block_next_start_offset after - so
|
||||
// even if we wrote a lot of open tombstones, we still get a full
|
||||
// block size of new data.
|
||||
if (!clustering_key.empty()) {
|
||||
auto& rts = _pi_write.tombstone_accumulator->range_tombstones_for_row(
|
||||
clustering_key_prefix(clustering_key.values()));
|
||||
for (const auto& rt : rts) {
|
||||
auto start = composite::from_clustering_element(*_pi_write.schemap, rt.start);
|
||||
auto end = composite::from_clustering_element(*_pi_write.schemap, rt.end);
|
||||
write_range_tombstone(out,
|
||||
start, rt.start_kind, end, rt.end_kind, {}, rt.tomb);
|
||||
}
|
||||
}
|
||||
_pi_write.block_next_start_offset = out.offset() + _pi_write.desired_block_size;
|
||||
_pi_write.block_first_colname = colname;
|
||||
_pi_write.block_last_colname = std::move(colname);
|
||||
} else {
|
||||
// Keep track of the last column in the partition - we'll need it to close
|
||||
// the last block in the promoted index, unfortunately.
|
||||
_pi_write.block_last_colname = std::move(colname);
|
||||
}
|
||||
}
|
||||
|
||||
// @clustering_key: it's expected that clustering key is already in its composite form.
|
||||
// NOTE: empty clustering key means that there is no clustering key.
|
||||
void sstable::write_column_name(file_writer& out, const composite& clustering_key, const std::vector<bytes_view>& column_names, composite::eoc marker) {
|
||||
@@ -1223,6 +1333,7 @@ void sstable::write_collection(file_writer& out, const composite& clustering_key
|
||||
const bytes& column_name = cdef.name();
|
||||
write_range_tombstone(out, clustering_key, clustering_key, { bytes_view(column_name) }, mview.tomb);
|
||||
for (auto& cp: mview.cells) {
|
||||
maybe_flush_pi_block(out, clustering_key, { column_name, cp.first });
|
||||
write_column_name(out, clustering_key, { column_name, cp.first });
|
||||
write_cell(out, cp.second);
|
||||
}
|
||||
@@ -1234,11 +1345,22 @@ void sstable::write_clustered_row(file_writer& out, const schema& schema, const
|
||||
auto clustering_key = composite::from_clustering_element(schema, clustered_row.key());
|
||||
|
||||
if (schema.is_compound() && !schema.is_dense()) {
|
||||
maybe_flush_pi_block(out, clustering_key, { bytes_view() });
|
||||
write_row_marker(out, clustered_row.marker(), clustering_key);
|
||||
}
|
||||
// Before writing cells, range tombstone must be written if the row has any (deletable_row::t).
|
||||
if (clustered_row.tomb()) {
|
||||
maybe_flush_pi_block(out, clustering_key, {});
|
||||
write_range_tombstone(out, clustering_key, clustering_key, {}, clustered_row.tomb());
|
||||
// Because we currently may break a partition to promoted-index blocks
|
||||
// in the middle of a clustered row, we also need to track the current
|
||||
// row's tombstone - not just range tombstones - which may effect the
|
||||
// beginning of a new block.
|
||||
// TODO: consider starting a new block only between rows, so the
|
||||
// following code can be dropped:
|
||||
_pi_write.tombstone_accumulator->apply(range_tombstone(
|
||||
clustered_row.key(), bound_kind::incl_start,
|
||||
clustered_row.key(), bound_kind::incl_end, clustered_row.tomb()));
|
||||
}
|
||||
|
||||
// Write all cells of a partition's row.
|
||||
@@ -1256,14 +1378,18 @@ void sstable::write_clustered_row(file_writer& out, const schema& schema, const
|
||||
|
||||
if (schema.is_compound()) {
|
||||
if (schema.is_dense()) {
|
||||
maybe_flush_pi_block(out, composite(), { bytes_view(clustering_key) });
|
||||
write_column_name(out, bytes_view(clustering_key));
|
||||
} else {
|
||||
maybe_flush_pi_block(out, clustering_key, { bytes_view(column_name) });
|
||||
write_column_name(out, clustering_key, { bytes_view(column_name) });
|
||||
}
|
||||
} else {
|
||||
if (schema.is_dense()) {
|
||||
maybe_flush_pi_block(out, composite(), { bytes_view(clustered_row.key().get_component(schema, 0)) });
|
||||
write_column_name(out, bytes_view(clustered_row.key().get_component(schema, 0)));
|
||||
} else {
|
||||
maybe_flush_pi_block(out, composite(), { bytes_view(column_name) });
|
||||
write_column_name(out, bytes_view(column_name));
|
||||
}
|
||||
}
|
||||
@@ -1282,16 +1408,25 @@ void sstable::write_static_row(file_writer& out, const schema& schema, const row
|
||||
assert(column_definition.is_static());
|
||||
atomic_cell_view cell = c.as_atomic_cell();
|
||||
auto sp = composite::static_prefix(schema);
|
||||
maybe_flush_pi_block(out, sp, { bytes_view(column_definition.name()) });
|
||||
write_column_name(out, sp, { bytes_view(column_definition.name()) });
|
||||
write_cell(out, cell);
|
||||
});
|
||||
}
|
||||
|
||||
static void write_index_entry(file_writer& out, disk_string_view<uint16_t>& key, uint64_t pos) {
|
||||
// FIXME: support promoted indexes.
|
||||
uint32_t promoted_index_size = 0;
|
||||
static void write_index_header(file_writer& out, disk_string_view<uint16_t>& key, uint64_t pos) {
|
||||
write(out, key, pos);
|
||||
}
|
||||
|
||||
write(out, key, pos, promoted_index_size);
|
||||
static void write_index_promoted(file_writer& out, bytes_ostream& promoted_index,
|
||||
deletion_time deltime, uint32_t numblocks) {
|
||||
uint32_t promoted_index_size = promoted_index.size();
|
||||
if (promoted_index_size) {
|
||||
promoted_index_size += 16 /* deltime + numblocks */;
|
||||
write(out, promoted_index_size, deltime, numblocks, promoted_index);
|
||||
} else {
|
||||
write(out, promoted_index_size);
|
||||
}
|
||||
}
|
||||
|
||||
static void prepare_summary(summary& s, uint64_t expected_partition_count, uint32_t min_index_interval) {
|
||||
@@ -1405,6 +1540,17 @@ file_writer components_writer::index_file_writer(sstable& sst, const io_priority
|
||||
return file_writer(sst._index_file, std::move(options));
|
||||
}
|
||||
|
||||
// Get the currently loaded configuration, or the default configuration in
|
||||
// case none has been loaded (this happens, for example, in unit tests).
|
||||
static const db::config& get_config() {
|
||||
if (service::get_storage_service().local_is_initialized()) {
|
||||
return service::get_local_storage_service().db().local().get_config();
|
||||
} else {
|
||||
static db::config default_config;
|
||||
return default_config;
|
||||
}
|
||||
}
|
||||
|
||||
components_writer::components_writer(sstable& sst, const schema& s, file_writer& out,
|
||||
uint64_t estimated_partitions, uint64_t max_sstable_size,
|
||||
const io_priority_class& pc)
|
||||
@@ -1415,6 +1561,7 @@ components_writer::components_writer(sstable& sst, const schema& s, file_writer&
|
||||
, _max_sstable_size(max_sstable_size)
|
||||
{
|
||||
_sst._filter = utils::i_filter::get_filter(estimated_partitions, _schema.bloom_filter_fp_chance());
|
||||
_sst._pi_write.desired_block_size = get_config().column_index_size_in_kb() * 1024;
|
||||
|
||||
prepare_summary(_sst._summary, estimated_partitions, _schema.min_index_interval());
|
||||
|
||||
@@ -1435,7 +1582,17 @@ void components_writer::consume_new_partition(const dht::decorated_key& dk) {
|
||||
p_key.value = bytes_view(*_partition_key);
|
||||
|
||||
// Write index file entry from partition key into index file.
|
||||
write_index_entry(_index, p_key, _out.offset());
|
||||
// Write an index entry minus the "promoted index" (sample of columns)
|
||||
// part. We can only write that after processing the entire partition
|
||||
// and collecting the sample of columns.
|
||||
write_index_header(_index, p_key, _out.offset());
|
||||
_sst._pi_write.data = {};
|
||||
_sst._pi_write.numblocks = 0;
|
||||
_sst._pi_write.deltime.local_deletion_time = std::numeric_limits<int32_t>::max();
|
||||
_sst._pi_write.deltime.marked_for_delete_at = std::numeric_limits<int64_t>::min();
|
||||
_sst._pi_write.block_start_offset = _out.offset();
|
||||
_sst._pi_write.tombstone_accumulator = range_tombstone_accumulator(_schema, false);
|
||||
_sst._pi_write.schemap = &_schema; // sadly we need this
|
||||
|
||||
// Write partition key into data file.
|
||||
write(_out, p_key);
|
||||
@@ -1461,6 +1618,8 @@ void components_writer::consume(tombstone t) {
|
||||
}
|
||||
write(_out, d);
|
||||
_tombstone_written = true;
|
||||
// TODO: need to verify we don't do this twice?
|
||||
_sst._pi_write.deltime = d;
|
||||
}
|
||||
|
||||
stop_iteration components_writer::consume(static_row&& sr) {
|
||||
@@ -1477,13 +1636,35 @@ stop_iteration components_writer::consume(clustering_row&& cr) {
|
||||
|
||||
stop_iteration components_writer::consume(range_tombstone&& rt) {
|
||||
ensure_tombstone_is_written();
|
||||
// Remember the range tombstone so when we need to open a new promoted
|
||||
// index block, we can figure out which ranges are still open and need
|
||||
// to be repeated in the data file. Note that apply() also drops ranges
|
||||
// already closed by rt.start, so the accumulator doesn't grow boundless.
|
||||
_sst._pi_write.tombstone_accumulator->apply(rt);
|
||||
auto start = composite::from_clustering_element(_schema, std::move(rt.start));
|
||||
auto end = composite::from_clustering_element(_schema, std::move(rt.end));
|
||||
_sst.maybe_flush_pi_block(_out, start, {});
|
||||
_sst.write_range_tombstone(_out, std::move(start), rt.start_kind, std::move(end), rt.end_kind, {}, rt.tomb);
|
||||
return stop_iteration::no;
|
||||
}
|
||||
|
||||
stop_iteration components_writer::consume_end_of_partition() {
|
||||
// If there is an incomplete block in the promoted index, write it too.
|
||||
// However, if the _promoted_index is still empty, don't add a single
|
||||
// chunk - better not output a promoted index at all in this case.
|
||||
if (!_sst._pi_write.data.empty() && !_sst._pi_write.block_first_colname.empty()) {
|
||||
output_promoted_index_entry(_sst._pi_write.data,
|
||||
_sst._pi_write.block_first_colname,
|
||||
_sst._pi_write.block_last_colname,
|
||||
_sst._pi_write.block_start_offset - _sst._c_stats.start_offset,
|
||||
_out.offset() - _sst._pi_write.block_start_offset);
|
||||
_sst._pi_write.numblocks++;
|
||||
_sst._pi_write.block_first_colname = {};
|
||||
}
|
||||
write_index_promoted(_index, _sst._pi_write.data, _sst._pi_write.deltime,
|
||||
_sst._pi_write.numblocks);
|
||||
_sst._pi_write.data = {};
|
||||
|
||||
ensure_tombstone_is_written();
|
||||
int16_t end_of_row = 0;
|
||||
write(_out, end_of_row);
|
||||
|
||||
@@ -407,6 +407,27 @@ private:
|
||||
uint64_t _filter_file_size = 0;
|
||||
uint64_t _bytes_on_disk = 0;
|
||||
|
||||
// _pi_write is used temporarily for building the promoted
|
||||
// index (column sample) of one partition when writing a new sstable.
|
||||
struct {
|
||||
// Unfortunately we cannot output the promoted index directly to the
|
||||
// index file because it needs to be prepended by its size.
|
||||
bytes_ostream data;
|
||||
uint32_t numblocks;
|
||||
deletion_time deltime;
|
||||
uint64_t block_start_offset;
|
||||
uint64_t block_next_start_offset;
|
||||
bytes block_first_colname;
|
||||
bytes block_last_colname;
|
||||
std::experimental::optional<range_tombstone_accumulator> tombstone_accumulator;
|
||||
const schema* schemap;
|
||||
size_t desired_block_size;
|
||||
} _pi_write;
|
||||
|
||||
void maybe_flush_pi_block(file_writer& out,
|
||||
const composite& clustering_key,
|
||||
const std::vector<bytes_view>& column_names);
|
||||
|
||||
sstring _ks;
|
||||
sstring _cf;
|
||||
sstring _dir;
|
||||
|
||||
Reference in New Issue
Block a user