Merge "Be able to boot without a Summary" from Glauber

"Summary files are a relatively recent addition to Cassandra. I thought
that every SSTable converted to 2.1 would have them, but that does not
seem to be true. It's easy to generate a stream of files that will boot
in Cassandra 2.1 just fine, but not in Scylla as they will be missing
the Summary.

Cassandra can boot those files because they are robust against the Summary
not existing, and we should do the same.

Since we keep the Summary in memory, in case one does not exist we create a
memory copy of it from the Index - the filesystem is not touched. Hopefully,
compaction will run soon and the next time we boot we won't have to do such
thing.

Fixes #1170"
This commit is contained in:
Pekka Enberg
2016-04-09 20:38:57 +03:00
5 changed files with 105 additions and 32 deletions

View File

@@ -493,8 +493,9 @@ future<sstables::entry_descriptor> column_family::probe_file(sstring sstdir, sst
}
}
auto fut = sstable::get_sstable_key_range(*_schema, _schema->ks_name(), _schema->cf_name(), sstdir, comps.generation, comps.version, comps.format);
return std::move(fut).then([this, sstdir = std::move(sstdir), comps] (range<partition_key> r) {
auto sst = std::make_unique<sstables::sstable>(_schema->ks_name(), _schema->cf_name(), sstdir, comps.generation, comps.version, comps.format);
auto fut = sst->get_sstable_key_range(*_schema);
return std::move(fut).then([this, sst = std::move(sst), sstdir = std::move(sstdir), comps] (range<partition_key> r) mutable {
// Checks whether or not sstable belongs to current shard.
if (!belongs_to_current_shard(*_schema, std::move(r))) {
dblog.debug("sstable {} not relevant for this shard, ignoring",
@@ -504,7 +505,6 @@ future<sstables::entry_descriptor> column_family::probe_file(sstring sstdir, sst
return make_ready_future<>();
}
auto sst = std::make_unique<sstables::sstable>(_schema->ks_name(), _schema->cf_name(), sstdir, comps.generation, comps.version, comps.format);
auto fut = sst->load();
return std::move(fut).then([this, sst = std::move(sst)] () mutable {
add_sstable(std::move(*sst));

View File

@@ -42,10 +42,12 @@ public:
}
};
class index_consume_entry_context: public data_consumer::continuous_data_consumer<index_consume_entry_context> {
template <class IndexConsumer>
class index_consume_entry_context: public data_consumer::continuous_data_consumer<index_consume_entry_context<IndexConsumer>> {
using proceed = data_consumer::proceed;
using parent = data_consumer::continuous_data_consumer<index_consume_entry_context<IndexConsumer>>;
private:
index_consumer& _consumer;
IndexConsumer& _consumer;
enum class state {
START,
@@ -66,7 +68,7 @@ public:
bool non_consuming() const {
return ((_state == state::CONSUME_ENTRY) || (_state == state::START) ||
((_state == state::PROMOTED_BYTES) && (_prestate == prestate::NONE)));
((_state == state::PROMOTED_BYTES) && (parent::_prestate == parent::prestate::NONE)));
}
proceed process_state(temporary_buffer<char>& data) {
@@ -79,32 +81,32 @@ public:
_state = state::KEY_SIZE;
break;
case state::KEY_SIZE:
if (read_16(data) != read_status::ready) {
if (parent::read_16(data) != parent::read_status::ready) {
_state = state::KEY_BYTES;
break;
}
case state::KEY_BYTES:
if (read_bytes(data, _u16, _key) != read_status::ready) {
if (parent::read_bytes(data, parent::_u16, _key) != parent::read_status::ready) {
_state = state::POSITION;
break;
}
case state::POSITION:
if (read_64(data) != read_status::ready) {
if (parent::read_64(data) != parent::read_status::ready) {
_state = state::PROMOTED_SIZE;
break;
}
case state::PROMOTED_SIZE:
if (read_32(data) != read_status::ready) {
if (parent::read_32(data) != parent::read_status::ready) {
_state = state::PROMOTED_BYTES;
break;
}
case state::PROMOTED_BYTES:
if (read_bytes(data, _u32, _promoted) != read_status::ready) {
if (parent::read_bytes(data, parent::_u32, _promoted) != parent::read_status::ready) {
_state = state::CONSUME_ENTRY;
break;
}
case state::CONSUME_ENTRY:
_consumer.consume_entry(index_entry(std::move(_key), _u64, std::move(_promoted)));
_consumer.consume_entry(index_entry(std::move(_key), parent::_u64, std::move(_promoted)));
_state = state::START;
break;
default:
@@ -113,9 +115,9 @@ public:
return proceed::yes;
}
index_consume_entry_context(index_consumer& consumer,
index_consume_entry_context(IndexConsumer& consumer,
input_stream<char>&& input, uint64_t maxlen)
: continuous_data_consumer(std::move(input), maxlen)
: parent(std::move(input), maxlen)
, _consumer(consumer)
{}

View File

@@ -696,6 +696,10 @@ inline void write(file_writer& out, estimated_histogram& eh) {
// This is small enough, and well-defined. Easier to just read it all
// at once
future<> sstable::read_toc() {
if (_components.size()) {
return make_ready_future<>();
}
auto file_path = filename(sstable::component_type::TOC);
sstlog.debug("Reading TOC file {} ", file_path);
@@ -881,7 +885,7 @@ future<index_list> sstable::read_indexes(uint64_t summary_idx, const io_priority
auto stream = make_file_input_stream(this->_index_file, position, end - position, std::move(options));
// TODO: it's redundant to constrain the consumer here to stop at
// index_size()-position, the input stream is already constrained.
auto ctx = make_lw_shared<index_consume_entry_context>(ic, std::move(stream), this->index_size() - position);
auto ctx = make_lw_shared<index_consume_entry_context<index_consumer>>(ic, std::move(stream), this->index_size() - position);
return ctx->consume_input(*ctx).then([ctx, &ic] {
return make_ready_future<index_list>(std::move(ic.indexes));
});
@@ -954,6 +958,20 @@ void sstable::write_statistics(const io_priority_class& pc) {
write_simple<component_type::Statistics>(_statistics, pc);
}
future<> sstable::read_summary(const io_priority_class& pc) {
if (_summary) {
return make_ready_future<>();
}
return read_toc().then([this, &pc] {
if (has_component(sstable::component_type::Summary)) {
return read_simple<component_type::Summary>(_summary, pc);
} else {
return generate_summary(default_priority_class());
}
});
}
future<> sstable::open_data() {
return when_all(open_checked_file_dma(sstable_read_error, filename(component_type::Index), open_flags::ro),
open_checked_file_dma(sstable_read_error, filename(component_type::Data), open_flags::ro))
@@ -1227,10 +1245,9 @@ static void write_index_entry(file_writer& out, disk_string_view<uint16_t>& key,
write(out, key, pos, promoted_index_size);
}
static void prepare_summary(summary& s, uint64_t expected_partition_count, const schema& schema) {
static void prepare_summary(summary& s, uint64_t expected_partition_count, uint32_t min_index_interval) {
assert(expected_partition_count >= 1);
auto min_index_interval = schema.min_index_interval();
s.header.min_index_interval = min_index_interval;
s.header.sampling_level = downsampling::BASE_SAMPLING_LEVEL;
uint64_t max_expected_entries =
@@ -1247,8 +1264,7 @@ static void prepare_summary(summary& s, uint64_t expected_partition_count, const
static void seal_summary(summary& s,
std::experimental::optional<key>&& first_key,
std::experimental::optional<key>&& last_key,
const schema& schema) {
std::experimental::optional<key>&& last_key) {
s.header.size = s.entries.size();
s.header.size_at_full_sampling = s.header.size;
@@ -1337,7 +1353,7 @@ void sstable::do_write_components(::mutation_reader mr,
auto filter_fp_chance = schema->bloom_filter_fp_chance();
_filter = utils::i_filter::get_filter(estimated_partitions, filter_fp_chance);
prepare_summary(_summary, estimated_partitions, *schema);
prepare_summary(_summary, estimated_partitions, schema->min_index_interval());
// FIXME: we may need to set repaired_at stats at this point.
@@ -1417,7 +1433,7 @@ void sstable::do_write_components(::mutation_reader mr,
}
}
seal_summary(_summary, std::move(first_key), std::move(last_key), *schema);
seal_summary(_summary, std::move(first_key), std::move(last_key));
index->close().get();
_index_file = file(); // index->close() closed _index_file
@@ -1490,6 +1506,55 @@ future<> sstable::write_components(::mutation_reader mr,
});
}
future<> sstable::generate_summary(const io_priority_class& pc) {
if (_summary) {
return make_ready_future<>();
}
sstlog.info("Summary file {} not found. Generating Summary...", filename(sstable::component_type::Summary));
class summary_generator {
summary& _summary;
public:
std::experimental::optional<key> first_key, last_key;
summary_generator(summary& s) : _summary(s) {}
bool should_continue() {
return true;
}
void consume_entry(index_entry&& ie) {
maybe_add_summary_entry(_summary, ie.get_key_bytes(), ie.position());
if (!first_key) {
first_key = key(to_bytes(ie.get_key_bytes()));
} else {
last_key = key(to_bytes(ie.get_key_bytes()));
}
}
};
return open_checked_file_dma(sstable_read_error, filename(component_type::Index), open_flags::ro).then([this, &pc] (file index_file) {
return do_with(std::move(index_file), [this, &pc] (file index_file) {
return index_file.size().then([this, &pc, index_file] (auto size) {
// an upper bound. Surely to be less than this.
auto estimated_partitions = size / sizeof(uint64_t);
// Since we don't have a summary, use a default min_index_interval, and if needed we'll resample
// later.
prepare_summary(_summary, estimated_partitions, 0x80);
file_input_stream_options options;
options.buffer_size = sstable_buffer_size;
options.io_priority_class = pc;
auto stream = make_file_input_stream(index_file, 0, size, std::move(options));
return do_with(summary_generator(_summary), [this, &pc, stream = std::move(stream), size] (summary_generator& s) mutable {
auto ctx = make_lw_shared<index_consume_entry_context<summary_generator>>(s, std::move(stream), size);
return ctx->consume_input(*ctx).then([this, ctx, &s] {
seal_summary(_summary, std::move(s.first_key), std::move(s.last_key));
});
});
});
});
});
}
uint64_t sstable::data_size() const {
if (has_component(sstable::component_type::CompressionInfo)) {
return _compression.data_len;
@@ -1940,12 +2005,11 @@ sstable::remove_sstable_with_temp_toc(sstring ks, sstring cf, sstring dir, int64
}
future<range<partition_key>>
sstable::get_sstable_key_range(const schema& s, sstring ks, sstring cf, sstring dir, int64_t generation, version_types v, format_types f) {
auto sst = std::make_unique<sstable>(ks, cf, dir, generation, v, f);
auto fut = sst->read_summary(default_priority_class());
return std::move(fut).then([sst = std::move(sst), &s] () mutable {
auto first = sst->get_first_partition_key(s);
auto last = sst->get_last_partition_key(s);
sstable::get_sstable_key_range(const schema& s) {
auto fut = read_summary(default_priority_class());
return std::move(fut).then([this, &s] () mutable {
auto first = get_first_partition_key(s);
auto last = get_last_partition_key(s);
return make_ready_future<range<partition_key>>(range<partition_key>::make(first, last));
});
}

View File

@@ -399,13 +399,16 @@ private:
void write_filter(const io_priority_class& pc);
future<> read_summary(const io_priority_class& pc) {
return read_simple<component_type::Summary>(_summary, pc);
}
future<> read_summary(const io_priority_class& pc);
void write_summary(const io_priority_class& pc) {
write_simple<component_type::Summary>(_summary, pc);
}
// To be called when we try to load an SSTable that lacks a Summary. Could
// happen if old tools are being used.
future<> generate_summary(const io_priority_class& pc);
future<> read_statistics(const io_priority_class& pc);
void write_statistics(const io_priority_class& pc);
@@ -537,8 +540,8 @@ public:
}
// Return sstable key range as range<partition_key> reading only the summary component.
static future<range<partition_key>>
get_sstable_key_range(const schema& s, sstring ks, sstring cf, sstring dir, int64_t generation, version_types v, format_types f);
future<range<partition_key>>
get_sstable_key_range(const schema& s);
// Used to mark a sstable for deletion that is not relevant to the current shard.
// It doesn't mean that the sstable will be deleted, but that the sstable is not

View File

@@ -144,6 +144,10 @@ struct summary_ka {
uint64_t memory_footprint() const {
return sizeof(summary_entry) * entries.size() + sizeof(uint32_t) * positions.size() + sizeof(*this);
}
explicit operator bool() const {
return entries.size();
}
};
using summary = summary_ka;